refactor: 使用 PartySocket 重构连接韧性并库化音频封包

This commit is contained in:
2026-03-06 06:54:20 +08:00
parent 7cf48246f2
commit f78c022f75
3 changed files with 345 additions and 38 deletions

View File

@@ -5,6 +5,7 @@
"": { "": {
"name": "web", "name": "web",
"dependencies": { "dependencies": {
"@msgpack/msgpack": "^3.1.3",
"@picovoice/web-voice-processor": "^4.0.9", "@picovoice/web-voice-processor": "^4.0.9",
"partysocket": "^1.1.16", "partysocket": "^1.1.16",
"react": "^19.2.4", "react": "^19.2.4",
@@ -297,6 +298,8 @@
"@jridgewell/trace-mapping": ["@jridgewell/trace-mapping@0.3.31", "", { "dependencies": { "@jridgewell/resolve-uri": "^3.1.0", "@jridgewell/sourcemap-codec": "^1.4.14" } }, "sha512-zzNR+SdQSDJzc8joaeP8QQoCQr8NuYx2dIIytl1QeBEZHJ9uW6hebsrYgbz8hJwUQao3TWCMtmfV8Nu1twOLAw=="], "@jridgewell/trace-mapping": ["@jridgewell/trace-mapping@0.3.31", "", { "dependencies": { "@jridgewell/resolve-uri": "^3.1.0", "@jridgewell/sourcemap-codec": "^1.4.14" } }, "sha512-zzNR+SdQSDJzc8joaeP8QQoCQr8NuYx2dIIytl1QeBEZHJ9uW6hebsrYgbz8hJwUQao3TWCMtmfV8Nu1twOLAw=="],
"@msgpack/msgpack": ["@msgpack/msgpack@3.1.3", "", {}, "sha512-47XIizs9XZXvuJgoaJUIE2lFoID8ugvc0jzSHP+Ptfk8nTbnR8g788wv48N03Kx0UkAv559HWRQ3yzOgzlRNUA=="],
"@picovoice/web-utils": ["@picovoice/web-utils@1.3.1", "", { "dependencies": { "commander": "^9.2.0" }, "bin": { "pvbase64": "scripts/base64.js" } }, "sha512-jcDqdULtTm+yJrnHDjg64hARup+Z4wNkYuXHNx6EM8+qZkweBq9UA6XJrHAlUkPnlkso4JWjaIKhz3x8vZcd3g=="], "@picovoice/web-utils": ["@picovoice/web-utils@1.3.1", "", { "dependencies": { "commander": "^9.2.0" }, "bin": { "pvbase64": "scripts/base64.js" } }, "sha512-jcDqdULtTm+yJrnHDjg64hARup+Z4wNkYuXHNx6EM8+qZkweBq9UA6XJrHAlUkPnlkso4JWjaIKhz3x8vZcd3g=="],
"@picovoice/web-voice-processor": ["@picovoice/web-voice-processor@4.0.9", "", { "dependencies": { "@picovoice/web-utils": "=1.3.1" } }, "sha512-20pdkFjtuiojAdLIkNHXt4YgpRnlUePFW+gfkeCb+J+2XTRDGOI50+aJzL95p6QjDzGXsO7PZhlz7yDofOvZtg=="], "@picovoice/web-voice-processor": ["@picovoice/web-voice-processor@4.0.9", "", { "dependencies": { "@picovoice/web-utils": "=1.3.1" } }, "sha512-20pdkFjtuiojAdLIkNHXt4YgpRnlUePFW+gfkeCb+J+2XTRDGOI50+aJzL95p6QjDzGXsO7PZhlz7yDofOvZtg=="],

View File

@@ -22,6 +22,7 @@
"vite-plugin-pwa": "^1.2.0" "vite-plugin-pwa": "^1.2.0"
}, },
"dependencies": { "dependencies": {
"@msgpack/msgpack": "^3.1.3",
"@picovoice/web-voice-processor": "^4.0.9", "@picovoice/web-voice-processor": "^4.0.9",
"partysocket": "^1.1.16", "partysocket": "^1.1.16",
"react": "^19.2.4", "react": "^19.2.4",

View File

@@ -1,8 +1,31 @@
import { encode } from "@msgpack/msgpack";
import { WebSocket as ReconnectingWebSocket } from "partysocket"; import { WebSocket as ReconnectingWebSocket } from "partysocket";
import { useCallback, useEffect, useRef } from "react"; import { useCallback, useEffect, useRef } from "react";
import { toast } from "sonner"; import { toast } from "sonner";
import type { ClientMsg, ServerMsg } from "../protocol";
import { useAppStore } from "../stores/app-store"; import { useAppStore } from "../stores/app-store";
const HEARTBEAT_INTERVAL_MS = 15000;
const HEARTBEAT_TIMEOUT_MS = 10000;
const START_TIMEOUT_MS = 5000;
const AUDIO_PACKET_VERSION = 1;
const MAX_AUDIO_QUEUE = 6;
const WEAK_NETWORK_TOAST_INTERVAL_MS = 4000;
const WS_BACKPRESSURE_BYTES = 128 * 1024;
interface QueuedAudioPacket {
sessionId: string;
seq: number;
buffer: ArrayBuffer;
}
interface AudioPacketPayload {
v: number;
sessionId: string;
seq: number;
pcm: Uint8Array;
}
function getWsUrl(): string { function getWsUrl(): string {
const proto = location.protocol === "https:" ? "wss:" : "ws:"; const proto = location.protocol === "https:" ? "wss:" : "ws:";
const params = new URLSearchParams(location.search); const params = new URLSearchParams(location.search);
@@ -11,75 +34,355 @@ function getWsUrl(): string {
return `${proto}//${location.host}/ws${q}`; return `${proto}//${location.host}/ws${q}`;
} }
function createSessionId(): string {
if (typeof crypto !== "undefined" && "randomUUID" in crypto) {
return crypto.randomUUID();
}
return `${Date.now()}-${Math.random().toString(16).slice(2)}`;
}
function encodeAudioPacket(
sessionId: string,
seq: number,
pcm: Int16Array,
): ArrayBuffer {
const sidBytes = new TextEncoder().encode(sessionId);
if (sidBytes.length === 0 || sidBytes.length > 96) {
throw new Error("invalid sessionId length");
}
const pcmBytes = new Uint8Array(pcm.buffer, pcm.byteOffset, pcm.byteLength);
const payload: AudioPacketPayload = {
v: AUDIO_PACKET_VERSION,
sessionId,
seq,
pcm: pcmBytes,
};
const packed = encode(payload);
return packed.buffer.slice(
packed.byteOffset,
packed.byteOffset + packed.byteLength,
);
}
export function useWebSocket() { export function useWebSocket() {
const wsRef = useRef<ReconnectingWebSocket | null>(null); const wsRef = useRef<ReconnectingWebSocket | null>(null);
const heartbeatTimerRef = useRef<number | null>(null);
const heartbeatTimeoutRef = useRef<number | null>(null);
const pendingStartRef = useRef<{
sessionId: string;
resolve: (sessionId: string | null) => void;
timer: number;
} | null>(null);
const audioQueueRef = useRef<QueuedAudioPacket[]>([]);
const lastWeakToastAtRef = useRef(0);
const sendJSON = useCallback((obj: Record<string, unknown>) => { const clearHeartbeat = useCallback(() => {
const ws = wsRef.current; if (heartbeatTimerRef.current !== null) {
if (ws?.readyState === WebSocket.OPEN) { window.clearInterval(heartbeatTimerRef.current);
ws.send(JSON.stringify(obj)); heartbeatTimerRef.current = null;
}
if (heartbeatTimeoutRef.current !== null) {
window.clearTimeout(heartbeatTimeoutRef.current);
heartbeatTimeoutRef.current = null;
} }
}, []); }, []);
const sendBinary = useCallback((data: Int16Array) => { const sendControl = useCallback((msg: ClientMsg): boolean => {
const ws = wsRef.current; const ws = wsRef.current;
if (ws?.readyState === WebSocket.OPEN) { if (ws?.readyState === WebSocket.OPEN) {
ws.send(data.buffer as ArrayBuffer); ws.send(JSON.stringify(msg));
return true;
}
return false;
}, []);
const notifyWeakNetwork = useCallback(() => {
const now = Date.now();
if (now - lastWeakToastAtRef.current < WEAK_NETWORK_TOAST_INTERVAL_MS) {
return;
}
lastWeakToastAtRef.current = now;
toast.warning("网络波动,正在缓冲音频…");
}, []);
const flushAudioQueue = useCallback(() => {
const ws = wsRef.current;
if (ws?.readyState !== WebSocket.OPEN) return;
const activeSessionId = useAppStore.getState().activeSessionId;
if (!activeSessionId) {
audioQueueRef.current = [];
useAppStore.getState().setWeakNetwork(false);
return;
}
const remaining: QueuedAudioPacket[] = [];
for (const packet of audioQueueRef.current) {
if (packet.sessionId !== activeSessionId) {
continue;
}
if (
ws.readyState !== WebSocket.OPEN ||
ws.bufferedAmount > WS_BACKPRESSURE_BYTES
) {
remaining.push(packet);
continue;
}
ws.send(packet.buffer);
}
audioQueueRef.current = remaining;
if (audioQueueRef.current.length === 0) {
useAppStore.getState().setWeakNetwork(false);
} }
}, []); }, []);
useEffect(() => { const sendAudioFrame = useCallback(
useAppStore.getState().setConnectionStatus("connecting"); (sessionId: string, seq: number, pcm: Int16Array): boolean => {
const store = useAppStore.getState();
if (!store.recording || store.activeSessionId !== sessionId) {
return false;
}
const ws = new ReconnectingWebSocket(getWsUrl(), undefined, { let buffer: ArrayBuffer;
minReconnectionDelay: 1000,
maxReconnectionDelay: 16000,
});
ws.binaryType = "arraybuffer";
ws.onopen = () => {
useAppStore.getState().setConnectionStatus("connected");
};
ws.onmessage = (e: MessageEvent) => {
if (typeof e.data !== "string") return;
try { try {
const msg = JSON.parse(e.data); buffer = encodeAudioPacket(sessionId, seq, pcm);
} catch {
return false;
}
const ws = wsRef.current;
if (
ws?.readyState === WebSocket.OPEN &&
ws.bufferedAmount <= WS_BACKPRESSURE_BYTES
) {
ws.send(buffer);
if (audioQueueRef.current.length === 0) {
useAppStore.getState().setWeakNetwork(false);
}
return true;
}
if (audioQueueRef.current.length >= MAX_AUDIO_QUEUE) {
audioQueueRef.current.shift();
}
audioQueueRef.current.push({ sessionId, seq, buffer });
useAppStore.getState().setWeakNetwork(true);
notifyWeakNetwork();
return false;
},
[notifyWeakNetwork],
);
const resolvePendingStart = useCallback((sessionId: string | null) => {
const pending = pendingStartRef.current;
if (!pending) return;
window.clearTimeout(pending.timer);
pending.resolve(sessionId);
pendingStartRef.current = null;
}, []);
const handleServerMessage = useCallback(
(raw: string) => {
let msg: ServerMsg;
try {
msg = JSON.parse(raw) as ServerMsg;
} catch {
return;
}
const store = useAppStore.getState(); const store = useAppStore.getState();
switch (msg.type) { switch (msg.type) {
case "partial": case "ready":
break;
case "state":
if (msg.state === "idle") {
store.setRecording(false);
store.setStopping(false);
store.setPendingStart(false);
store.setActiveSessionId(null);
}
if (msg.state === "stopping") {
store.setStopping(true);
}
break;
case "start_ack":
if (
pendingStartRef.current &&
msg.sessionId === pendingStartRef.current.sessionId
) {
resolvePendingStart(msg.sessionId ?? null);
}
break;
case "stop_ack":
store.setStopping(true);
break;
case "partial": {
const activeSessionId = store.activeSessionId;
if (!activeSessionId || msg.sessionId !== activeSessionId) break;
store.setPreview(msg.text || "", false); store.setPreview(msg.text || "", false);
break; break;
case "final": }
case "final": {
const activeSessionId = store.activeSessionId;
if (!activeSessionId || msg.sessionId !== activeSessionId) break;
store.setPreview(msg.text || "", true); store.setPreview(msg.text || "", true);
if (msg.text) store.addHistory(msg.text); if (msg.text) store.addHistory(msg.text);
break; break;
}
case "pasted": case "pasted":
toast.success("已粘贴"); toast.success("已粘贴");
break; break;
case "error": case "error":
toast.error(msg.message || "错误"); store.setRecording(false);
store.setPendingStart(false);
store.setStopping(false);
resolvePendingStart(null);
toast.error(msg.message || "服务错误");
break;
case "pong":
if (heartbeatTimeoutRef.current !== null) {
window.clearTimeout(heartbeatTimeoutRef.current);
heartbeatTimeoutRef.current = null;
}
break; break;
} }
} catch { },
// Ignore malformed messages [resolvePendingStart],
);
const startHeartbeat = useCallback(() => {
clearHeartbeat();
heartbeatTimerRef.current = window.setInterval(() => {
if (document.hidden) return;
const now = Date.now();
if (!sendControl({ type: "ping", ts: now })) return;
if (heartbeatTimeoutRef.current !== null) {
window.clearTimeout(heartbeatTimeoutRef.current);
}
heartbeatTimeoutRef.current = window.setTimeout(() => {
wsRef.current?.close();
}, HEARTBEAT_TIMEOUT_MS);
}, HEARTBEAT_INTERVAL_MS);
}, [clearHeartbeat, sendControl]);
useEffect(() => {
const ws = new ReconnectingWebSocket(getWsUrl(), undefined, {
minReconnectionDelay: 400,
maxReconnectionDelay: 6000,
});
ws.binaryType = "arraybuffer";
wsRef.current = ws;
useAppStore.getState().setConnectionStatus("connecting");
ws.onopen = () => {
const store = useAppStore.getState();
store.setConnectionStatus("connected");
store.setWeakNetwork(false);
sendControl({ type: "hello", version: 2 });
startHeartbeat();
flushAudioQueue();
};
ws.onmessage = (e: MessageEvent) => {
if (typeof e.data === "string") {
handleServerMessage(e.data);
} }
}; };
ws.onclose = () => { ws.onclose = () => {
clearHeartbeat();
resolvePendingStart(null);
const store = useAppStore.getState(); const store = useAppStore.getState();
if (store.recording || store.pendingStart) {
toast.warning("连接中断,本次录音已结束");
}
store.setConnectionStatus("disconnected"); store.setConnectionStatus("disconnected");
if (store.recording) store.setRecording(false); store.setWeakNetwork(store.recording || audioQueueRef.current.length > 0);
if (store.pendingStart) store.setPendingStart(false); store.setRecording(false);
store.setPendingStart(false);
store.setStopping(false);
store.setActiveSessionId(null);
audioQueueRef.current = [];
}; };
wsRef.current = ws; ws.onerror = () => {
ws.close();
};
const onVisibilityChange = () => {
if (!document.hidden && wsRef.current?.readyState !== WebSocket.OPEN) {
wsRef.current?.close();
}
};
document.addEventListener("visibilitychange", onVisibilityChange);
return () => { return () => {
ws.close(); document.removeEventListener("visibilitychange", onVisibilityChange);
clearHeartbeat();
resolvePendingStart(null);
wsRef.current?.close();
wsRef.current = null; wsRef.current = null;
}; };
}, []); }, [
clearHeartbeat,
flushAudioQueue,
handleServerMessage,
resolvePendingStart,
sendControl,
startHeartbeat,
]);
return { sendJSON, sendBinary }; const requestStart = useCallback((): Promise<string | null> => {
const sessionId = createSessionId();
return new Promise<string | null>((resolve) => {
if (pendingStartRef.current) {
resolve(null);
return;
}
const timer = window.setTimeout(() => {
if (pendingStartRef.current?.sessionId === sessionId) {
pendingStartRef.current = null;
resolve(null);
}
}, START_TIMEOUT_MS);
pendingStartRef.current = { sessionId, resolve, timer };
const ok = sendControl({ type: "start", sessionId, version: 2 });
if (!ok) {
resolvePendingStart(null);
}
});
}, [resolvePendingStart, sendControl]);
const sendStop = useCallback(
(sessionId: string | null) => {
if (!sessionId) return;
audioQueueRef.current = audioQueueRef.current.filter(
(packet) => packet.sessionId !== sessionId,
);
if (audioQueueRef.current.length === 0) {
useAppStore.getState().setWeakNetwork(false);
}
sendControl({ type: "stop", sessionId });
},
[sendControl],
);
const sendPaste = useCallback(
(text: string) => {
const store = useAppStore.getState();
if (store.connectionStatus !== "connected") {
toast.warning("当前未连接,无法发送粘贴");
return;
}
const sessionId = useAppStore.getState().activeSessionId ?? undefined;
sendControl({ type: "paste", text, sessionId });
},
[sendControl],
);
return { requestStart, sendStop, sendPaste, sendAudioFrame };
} }