From 4110687e022e4b996b6e3bb4b61b38a70d1ba5ac Mon Sep 17 00:00:00 2001 From: kura Date: Thu, 7 May 2026 17:36:08 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=BA=86=E5=88=87=E7=89=87?= =?UTF-8?q?=E5=A4=A7=E5=B0=8F=EF=BC=8C=E9=81=BF=E5=85=8D=E6=95=B0=E9=87=8F?= =?UTF-8?q?=E8=BF=87=E5=A4=A7=E6=8C=A4=E7=88=86=E6=B5=8F=E8=A7=88=E5=99=A8?= =?UTF-8?q?=E4=BF=9D=E7=95=99=E7=9A=84=E7=BC=93=E5=86=B2=E5=8C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/pages/file/utils/fileTransfer.ts | 62 ++++++++++++++++++---------- src/pages/file/utils/peer.ts | 46 +++++++++++++++------ 2 files changed, 74 insertions(+), 34 deletions(-) diff --git a/src/pages/file/utils/fileTransfer.ts b/src/pages/file/utils/fileTransfer.ts index 3b7c98c..5a71053 100644 --- a/src/pages/file/utils/fileTransfer.ts +++ b/src/pages/file/utils/fileTransfer.ts @@ -1,6 +1,6 @@ -import { type FileData, type FileInfo } from "./fileMgr"; +import type { FileData, FileInfo } from "./fileMgr"; import { MessageType, peer } from "./peer"; -import { type DataConnection } from "peerjs"; +import type { DataConnection } from "peerjs"; // 传输状态枚举 export enum TransferStatus { WAITING = "waiting", // 等待传输 @@ -23,15 +23,16 @@ export interface TransferProgress { } // 分片配置 -const CHUNK_SIZE = 256 * 1024; // 256KB (从64KB提升,减少消息数量) -const MAX_CHUNK_SIZE = 256 * 1024; // 256KB (WebRTC SCTP 最大安全消息大小) +const CHUNK_SIZE = 64 * 1024; // 64KB, 交给 PeerJS 再切成 SCTP 安全包 +const MAX_CHUNK_SIZE = 64 * 1024; //多大的文件需要分片 export const NEED_CHUNK_FILE_SIZE = 200 * 1024; // 200KB export const NEED_CHUNK_FILE_SIZE_PREVIEW = 50 * 1024 * 1024; // 50MB // 流水线流控配置 -const MAX_BUFFERED_AMOUNT = 1024 * 1024; // 1MB - 触发流控的缓冲上限 -const DRAIN_THRESHOLD = 256 * 1024; // 256KB - 恢复发送的缓冲下限 +const PIPELINE_DEPTH = 4; // 最多4个应用层分片在途(约256KB) +const MAX_BUFFERED_AMOUNT = 512 * 1024; // 512KB - 触发流控的缓冲上限 +const DRAIN_THRESHOLD = 128 * 1024; // 128KB - 恢复发送的缓冲下限 export class FileTransfer { public conn: DataConnection; @@ -116,7 +117,27 @@ export class FileTransfer { public offset: number = 0; public totalSize: number = 0; private fileBuffer: ArrayBuffer | null = null; - // 发送文件 (流水线模式 - 不再逐片等待确认) + private async waitForWritable(targetBufferedAmount = MAX_BUFFERED_AMOUNT) { + const dc = this.conn.dataChannel; + if (!dc) { + await new Promise((resolve) => setTimeout(resolve, 10)); + return; + } + + const getPeerBufferSize = () => + ((this.conn as unknown as { bufferSize?: number }).bufferSize ?? 0); + + while ( + !this.aborted && + (dc.bufferedAmount > targetBufferedAmount || getPeerBufferSize() > 0) + ) { + if (!this.conn.open || dc.readyState !== "open") { + throw new Error("WebRTC 数据通道已关闭"); + } + await new Promise((resolve) => setTimeout(resolve, 10)); + } + } + // 发送文件 (流水线模式 - 受限深度,防止撑爆SCTP缓冲区) public async sendFile(savePath: string = ""): Promise { try { if (this.status == TransferStatus.WAITING) { @@ -133,8 +154,9 @@ export class FileTransfer { } const dc = this.conn.dataChannel; + let sentSinceDrain = 0; - // 流水线发送: 不再逐片等待确认,通过 bufferedAmount 控制背压 + // 流水线发送: 每轮最多发 PIPELINE_DEPTH 个分片,然后等待排空 while (this.offset < this.totalSize && !this.aborted) { if (this.pausePromise) { this.status = TransferStatus.PAUSED; @@ -151,8 +173,8 @@ export class FileTransfer { buffer: chunk, }; - // 发送分片 (fire-and-forget,不等确认) - peer.send( + await this.waitForWritable(); + await peer.send( { type: MessageType.push_file_chunk, data: this.fileData, @@ -164,19 +186,15 @@ export class FileTransfer { this.offset = end; this.transferredSize = this.offset; this.updateProgress(); + sentSinceDrain++; - // 基于 bufferedAmount 的流控: 缓冲过大时等待排空 - if (dc && dc.bufferedAmount > MAX_BUFFERED_AMOUNT) { - await new Promise((resolve) => { - const checkBuffer = () => { - if (dc.bufferedAmount < DRAIN_THRESHOLD || this.aborted) { - resolve(); - } else { - requestAnimationFrame(checkBuffer); - } - }; - checkBuffer(); - }); + // 背压控制: 达到深度限制或缓冲过大时,等待排空 + if ( + sentSinceDrain >= PIPELINE_DEPTH || + (dc && dc.bufferedAmount > MAX_BUFFERED_AMOUNT) + ) { + sentSinceDrain = 0; + await this.waitForWritable(DRAIN_THRESHOLD); } } diff --git a/src/pages/file/utils/peer.ts b/src/pages/file/utils/peer.ts index cc4174d..c4a825f 100644 --- a/src/pages/file/utils/peer.ts +++ b/src/pages/file/utils/peer.ts @@ -294,7 +294,25 @@ class Peer extends EventTarget { }); return Promise.reject("连接不存在"); } - conn.send(data, true); + if (!conn.open) { + notification.error({ + message: "连接未打开", + description: "WebRTC 数据通道已关闭或尚未建立", + }); + return Promise.reject(new Error("连接未打开")); + } + try { + const sendResult = conn.send(data); + if (sendResult instanceof Promise) { + await sendResult; + } + } catch (error) { + notification.error({ + message: "发送失败", + description: error instanceof Error ? error.message : String(error), + }); + return Promise.reject(error); + } if (isHandleResponse) { return Promise.resolve(data.data); } else { @@ -331,20 +349,24 @@ class Peer extends EventTarget { /**共交换的包数 */ public transpackNum: number = 0; private checkConnection() { - if (!this.remoteConnection) { + if (!this.remoteConnection || !this.remoteConnection.peerConnection) { return; } - const rtcp: RTCPeerConnection = this.remoteConnection.peerConnection; - rtcp.getStats().then((stats) => { - stats.forEach((stat) => { - if (stat.type == "data-channel") { - //流量 - this.transbytesNum = stat.bytesReceived + stat.bytesSent; - //包数 - this.transpackNum = stat.messagesReceived + stat.messagesSent; - } + this.remoteConnection.peerConnection + .getStats() + .then((stats) => { + stats.forEach((stat) => { + if (stat.type == "data-channel") { + //流量 + this.transbytesNum = stat.bytesReceived + stat.bytesSent; + //包数 + this.transpackNum = stat.messagesReceived + stat.messagesSent; + } + }); + }) + .catch((err) => { + console.warn("checkConnection getStats error:", err); }); - }); setTimeout(() => { this.checkConnection(); }, 1000);