From f78c022f7520682bcc0645f428c237c2dffff626 Mon Sep 17 00:00:00 2001 From: imbytecat Date: Fri, 6 Mar 2026 06:54:20 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BD=BF=E7=94=A8=20PartySocket=20?= =?UTF-8?q?=E9=87=8D=E6=9E=84=E8=BF=9E=E6=8E=A5=E9=9F=A7=E6=80=A7=E5=B9=B6?= =?UTF-8?q?=E5=BA=93=E5=8C=96=E9=9F=B3=E9=A2=91=E5=B0=81=E5=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- web/bun.lock | 3 + web/package.json | 1 + web/src/hooks/useWebSocket.ts | 379 ++++++++++++++++++++++++++++++---- 3 files changed, 345 insertions(+), 38 deletions(-) diff --git a/web/bun.lock b/web/bun.lock index bafeff6..13b0ac3 100644 --- a/web/bun.lock +++ b/web/bun.lock @@ -5,6 +5,7 @@ "": { "name": "web", "dependencies": { + "@msgpack/msgpack": "^3.1.3", "@picovoice/web-voice-processor": "^4.0.9", "partysocket": "^1.1.16", "react": "^19.2.4", @@ -297,6 +298,8 @@ "@jridgewell/trace-mapping": ["@jridgewell/trace-mapping@0.3.31", "", { "dependencies": { "@jridgewell/resolve-uri": "^3.1.0", "@jridgewell/sourcemap-codec": "^1.4.14" } }, "sha512-zzNR+SdQSDJzc8joaeP8QQoCQr8NuYx2dIIytl1QeBEZHJ9uW6hebsrYgbz8hJwUQao3TWCMtmfV8Nu1twOLAw=="], + "@msgpack/msgpack": ["@msgpack/msgpack@3.1.3", "", {}, "sha512-47XIizs9XZXvuJgoaJUIE2lFoID8ugvc0jzSHP+Ptfk8nTbnR8g788wv48N03Kx0UkAv559HWRQ3yzOgzlRNUA=="], + "@picovoice/web-utils": ["@picovoice/web-utils@1.3.1", "", { "dependencies": { "commander": "^9.2.0" }, "bin": { "pvbase64": "scripts/base64.js" } }, "sha512-jcDqdULtTm+yJrnHDjg64hARup+Z4wNkYuXHNx6EM8+qZkweBq9UA6XJrHAlUkPnlkso4JWjaIKhz3x8vZcd3g=="], "@picovoice/web-voice-processor": ["@picovoice/web-voice-processor@4.0.9", "", { "dependencies": { "@picovoice/web-utils": "=1.3.1" } }, "sha512-20pdkFjtuiojAdLIkNHXt4YgpRnlUePFW+gfkeCb+J+2XTRDGOI50+aJzL95p6QjDzGXsO7PZhlz7yDofOvZtg=="], diff --git a/web/package.json b/web/package.json index 734e807..6a18299 100644 --- a/web/package.json +++ b/web/package.json @@ -22,6 +22,7 @@ "vite-plugin-pwa": "^1.2.0" }, "dependencies": { + "@msgpack/msgpack": "^3.1.3", "@picovoice/web-voice-processor": "^4.0.9", "partysocket": "^1.1.16", "react": "^19.2.4", diff --git a/web/src/hooks/useWebSocket.ts b/web/src/hooks/useWebSocket.ts index 7754292..72a8807 100644 --- a/web/src/hooks/useWebSocket.ts +++ b/web/src/hooks/useWebSocket.ts @@ -1,8 +1,31 @@ +import { encode } from "@msgpack/msgpack"; import { WebSocket as ReconnectingWebSocket } from "partysocket"; import { useCallback, useEffect, useRef } from "react"; import { toast } from "sonner"; +import type { ClientMsg, ServerMsg } from "../protocol"; import { useAppStore } from "../stores/app-store"; +const HEARTBEAT_INTERVAL_MS = 15000; +const HEARTBEAT_TIMEOUT_MS = 10000; +const START_TIMEOUT_MS = 5000; +const AUDIO_PACKET_VERSION = 1; +const MAX_AUDIO_QUEUE = 6; +const WEAK_NETWORK_TOAST_INTERVAL_MS = 4000; +const WS_BACKPRESSURE_BYTES = 128 * 1024; + +interface QueuedAudioPacket { + sessionId: string; + seq: number; + buffer: ArrayBuffer; +} + +interface AudioPacketPayload { + v: number; + sessionId: string; + seq: number; + pcm: Uint8Array; +} + function getWsUrl(): string { const proto = location.protocol === "https:" ? "wss:" : "ws:"; const params = new URLSearchParams(location.search); @@ -11,75 +34,355 @@ function getWsUrl(): string { return `${proto}//${location.host}/ws${q}`; } +function createSessionId(): string { + if (typeof crypto !== "undefined" && "randomUUID" in crypto) { + return crypto.randomUUID(); + } + return `${Date.now()}-${Math.random().toString(16).slice(2)}`; +} + +function encodeAudioPacket( + sessionId: string, + seq: number, + pcm: Int16Array, +): ArrayBuffer { + const sidBytes = new TextEncoder().encode(sessionId); + if (sidBytes.length === 0 || sidBytes.length > 96) { + throw new Error("invalid sessionId length"); + } + + const pcmBytes = new Uint8Array(pcm.buffer, pcm.byteOffset, pcm.byteLength); + const payload: AudioPacketPayload = { + v: AUDIO_PACKET_VERSION, + sessionId, + seq, + pcm: pcmBytes, + }; + const packed = encode(payload); + return packed.buffer.slice( + packed.byteOffset, + packed.byteOffset + packed.byteLength, + ); +} + export function useWebSocket() { const wsRef = useRef(null); + const heartbeatTimerRef = useRef(null); + const heartbeatTimeoutRef = useRef(null); + const pendingStartRef = useRef<{ + sessionId: string; + resolve: (sessionId: string | null) => void; + timer: number; + } | null>(null); + const audioQueueRef = useRef([]); + const lastWeakToastAtRef = useRef(0); - const sendJSON = useCallback((obj: Record) => { - const ws = wsRef.current; - if (ws?.readyState === WebSocket.OPEN) { - ws.send(JSON.stringify(obj)); + const clearHeartbeat = useCallback(() => { + if (heartbeatTimerRef.current !== null) { + window.clearInterval(heartbeatTimerRef.current); + heartbeatTimerRef.current = null; + } + if (heartbeatTimeoutRef.current !== null) { + window.clearTimeout(heartbeatTimeoutRef.current); + heartbeatTimeoutRef.current = null; } }, []); - const sendBinary = useCallback((data: Int16Array) => { + const sendControl = useCallback((msg: ClientMsg): boolean => { const ws = wsRef.current; if (ws?.readyState === WebSocket.OPEN) { - ws.send(data.buffer as ArrayBuffer); + ws.send(JSON.stringify(msg)); + return true; + } + return false; + }, []); + + const notifyWeakNetwork = useCallback(() => { + const now = Date.now(); + if (now - lastWeakToastAtRef.current < WEAK_NETWORK_TOAST_INTERVAL_MS) { + return; + } + lastWeakToastAtRef.current = now; + toast.warning("网络波动,正在缓冲音频…"); + }, []); + + const flushAudioQueue = useCallback(() => { + const ws = wsRef.current; + if (ws?.readyState !== WebSocket.OPEN) return; + + const activeSessionId = useAppStore.getState().activeSessionId; + if (!activeSessionId) { + audioQueueRef.current = []; + useAppStore.getState().setWeakNetwork(false); + return; + } + + const remaining: QueuedAudioPacket[] = []; + for (const packet of audioQueueRef.current) { + if (packet.sessionId !== activeSessionId) { + continue; + } + if ( + ws.readyState !== WebSocket.OPEN || + ws.bufferedAmount > WS_BACKPRESSURE_BYTES + ) { + remaining.push(packet); + continue; + } + ws.send(packet.buffer); + } + + audioQueueRef.current = remaining; + if (audioQueueRef.current.length === 0) { + useAppStore.getState().setWeakNetwork(false); } }, []); + const sendAudioFrame = useCallback( + (sessionId: string, seq: number, pcm: Int16Array): boolean => { + const store = useAppStore.getState(); + if (!store.recording || store.activeSessionId !== sessionId) { + return false; + } + + let buffer: ArrayBuffer; + try { + buffer = encodeAudioPacket(sessionId, seq, pcm); + } catch { + return false; + } + + const ws = wsRef.current; + if ( + ws?.readyState === WebSocket.OPEN && + ws.bufferedAmount <= WS_BACKPRESSURE_BYTES + ) { + ws.send(buffer); + if (audioQueueRef.current.length === 0) { + useAppStore.getState().setWeakNetwork(false); + } + return true; + } + + if (audioQueueRef.current.length >= MAX_AUDIO_QUEUE) { + audioQueueRef.current.shift(); + } + audioQueueRef.current.push({ sessionId, seq, buffer }); + useAppStore.getState().setWeakNetwork(true); + notifyWeakNetwork(); + return false; + }, + [notifyWeakNetwork], + ); + + const resolvePendingStart = useCallback((sessionId: string | null) => { + const pending = pendingStartRef.current; + if (!pending) return; + window.clearTimeout(pending.timer); + pending.resolve(sessionId); + pendingStartRef.current = null; + }, []); + + const handleServerMessage = useCallback( + (raw: string) => { + let msg: ServerMsg; + try { + msg = JSON.parse(raw) as ServerMsg; + } catch { + return; + } + + const store = useAppStore.getState(); + switch (msg.type) { + case "ready": + break; + case "state": + if (msg.state === "idle") { + store.setRecording(false); + store.setStopping(false); + store.setPendingStart(false); + store.setActiveSessionId(null); + } + if (msg.state === "stopping") { + store.setStopping(true); + } + break; + case "start_ack": + if ( + pendingStartRef.current && + msg.sessionId === pendingStartRef.current.sessionId + ) { + resolvePendingStart(msg.sessionId ?? null); + } + break; + case "stop_ack": + store.setStopping(true); + break; + case "partial": { + const activeSessionId = store.activeSessionId; + if (!activeSessionId || msg.sessionId !== activeSessionId) break; + store.setPreview(msg.text || "", false); + break; + } + case "final": { + const activeSessionId = store.activeSessionId; + if (!activeSessionId || msg.sessionId !== activeSessionId) break; + store.setPreview(msg.text || "", true); + if (msg.text) store.addHistory(msg.text); + break; + } + case "pasted": + toast.success("已粘贴"); + break; + case "error": + store.setRecording(false); + store.setPendingStart(false); + store.setStopping(false); + resolvePendingStart(null); + toast.error(msg.message || "服务错误"); + break; + case "pong": + if (heartbeatTimeoutRef.current !== null) { + window.clearTimeout(heartbeatTimeoutRef.current); + heartbeatTimeoutRef.current = null; + } + break; + } + }, + [resolvePendingStart], + ); + + const startHeartbeat = useCallback(() => { + clearHeartbeat(); + heartbeatTimerRef.current = window.setInterval(() => { + if (document.hidden) return; + const now = Date.now(); + if (!sendControl({ type: "ping", ts: now })) return; + if (heartbeatTimeoutRef.current !== null) { + window.clearTimeout(heartbeatTimeoutRef.current); + } + heartbeatTimeoutRef.current = window.setTimeout(() => { + wsRef.current?.close(); + }, HEARTBEAT_TIMEOUT_MS); + }, HEARTBEAT_INTERVAL_MS); + }, [clearHeartbeat, sendControl]); + useEffect(() => { - useAppStore.getState().setConnectionStatus("connecting"); - const ws = new ReconnectingWebSocket(getWsUrl(), undefined, { - minReconnectionDelay: 1000, - maxReconnectionDelay: 16000, + minReconnectionDelay: 400, + maxReconnectionDelay: 6000, }); ws.binaryType = "arraybuffer"; + wsRef.current = ws; + useAppStore.getState().setConnectionStatus("connecting"); ws.onopen = () => { - useAppStore.getState().setConnectionStatus("connected"); + const store = useAppStore.getState(); + store.setConnectionStatus("connected"); + store.setWeakNetwork(false); + sendControl({ type: "hello", version: 2 }); + startHeartbeat(); + flushAudioQueue(); }; ws.onmessage = (e: MessageEvent) => { - if (typeof e.data !== "string") return; - try { - const msg = JSON.parse(e.data); - const store = useAppStore.getState(); - switch (msg.type) { - case "partial": - store.setPreview(msg.text || "", false); - break; - case "final": - store.setPreview(msg.text || "", true); - if (msg.text) store.addHistory(msg.text); - break; - case "pasted": - toast.success("已粘贴"); - break; - case "error": - toast.error(msg.message || "错误"); - break; - } - } catch { - // Ignore malformed messages + if (typeof e.data === "string") { + handleServerMessage(e.data); } }; ws.onclose = () => { + clearHeartbeat(); + resolvePendingStart(null); const store = useAppStore.getState(); + if (store.recording || store.pendingStart) { + toast.warning("连接中断,本次录音已结束"); + } store.setConnectionStatus("disconnected"); - if (store.recording) store.setRecording(false); - if (store.pendingStart) store.setPendingStart(false); + store.setWeakNetwork(store.recording || audioQueueRef.current.length > 0); + store.setRecording(false); + store.setPendingStart(false); + store.setStopping(false); + store.setActiveSessionId(null); + audioQueueRef.current = []; }; - wsRef.current = ws; + ws.onerror = () => { + ws.close(); + }; + + const onVisibilityChange = () => { + if (!document.hidden && wsRef.current?.readyState !== WebSocket.OPEN) { + wsRef.current?.close(); + } + }; + document.addEventListener("visibilitychange", onVisibilityChange); return () => { - ws.close(); + document.removeEventListener("visibilitychange", onVisibilityChange); + clearHeartbeat(); + resolvePendingStart(null); + wsRef.current?.close(); wsRef.current = null; }; - }, []); + }, [ + clearHeartbeat, + flushAudioQueue, + handleServerMessage, + resolvePendingStart, + sendControl, + startHeartbeat, + ]); - return { sendJSON, sendBinary }; + const requestStart = useCallback((): Promise => { + const sessionId = createSessionId(); + return new Promise((resolve) => { + if (pendingStartRef.current) { + resolve(null); + return; + } + + const timer = window.setTimeout(() => { + if (pendingStartRef.current?.sessionId === sessionId) { + pendingStartRef.current = null; + resolve(null); + } + }, START_TIMEOUT_MS); + + pendingStartRef.current = { sessionId, resolve, timer }; + const ok = sendControl({ type: "start", sessionId, version: 2 }); + if (!ok) { + resolvePendingStart(null); + } + }); + }, [resolvePendingStart, sendControl]); + + const sendStop = useCallback( + (sessionId: string | null) => { + if (!sessionId) return; + audioQueueRef.current = audioQueueRef.current.filter( + (packet) => packet.sessionId !== sessionId, + ); + if (audioQueueRef.current.length === 0) { + useAppStore.getState().setWeakNetwork(false); + } + sendControl({ type: "stop", sessionId }); + }, + [sendControl], + ); + + const sendPaste = useCallback( + (text: string) => { + const store = useAppStore.getState(); + if (store.connectionStatus !== "connected") { + toast.warning("当前未连接,无法发送粘贴"); + return; + } + const sessionId = useAppStore.getState().activeSessionId ?? undefined; + sendControl({ type: "paste", text, sessionId }); + }, + [sendControl], + ); + + return { requestStart, sendStop, sendPaste, sendAudioFrame }; }