import type { FileData, FileInfo } from "./fileMgr"; import { MessageType, peer } from "./peer"; import type { DataConnection } from "peerjs"; // 传输状态枚举 export enum TransferStatus { WAITING = "waiting", // 等待传输 SENDING = "sending", // 发送中 RECEIVING = "receiving", // 接收中 PAUSED = "paused", // 已暂停 COMPLETED = "completed", // 已完成 ERROR = "error", // 错误 } // 传输进度接口 export interface TransferProgress { transferredSize: number; // 已传输大小 totalSize: number; // 总大小 speed: number; // 传输速度 (bytes/s) status: TransferStatus; // 传输状态 percent: number; // 进度百分比 costTime: number; // 传输时间 updateTime: number; // 更新时间 } // 分片配置 const CHUNK_SIZE = 64 * 1024; // 64KB, 交给 PeerJS 再切成 SCTP 安全包 const MAX_CHUNK_SIZE = 64 * 1024; //多大的文件需要分片 export const NEED_CHUNK_FILE_SIZE = 200 * 1024; // 200KB export const NEED_CHUNK_FILE_SIZE_PREVIEW = 50 * 1024 * 1024; // 50MB // 流水线流控配置 const PIPELINE_DEPTH = 4; // 最多4个应用层分片在途(约256KB) const MAX_BUFFERED_AMOUNT = 512 * 1024; // 512KB - 触发流控的缓冲上限 const DRAIN_THRESHOLD = 128 * 1024; // 128KB - 恢复发送的缓冲下限 export class FileTransfer { public conn: DataConnection; private file: FileInfo; private chunkSize: number; private transferredSize: number = 0; private lastTransferredSize: number = 0; private startTime: number = 0; private status: TransferStatus = TransferStatus.WAITING; private pausePromise?: Promise; private pauseResolve?: () => void; private aborted: boolean = false; private preView: boolean = false; // 进度回调 private onProgressCallback?: (transfer: FileTransfer) => void; public getFile(): FileInfo { return this.file; } constructor( conn: DataConnection, file: FileInfo, chunkSize: number = CHUNK_SIZE, ) { this.conn = conn; this.file = file; this.chunkSize = Math.min(chunkSize, MAX_CHUNK_SIZE); file.addTransfer(this); fileTransferMgrInstance.addFileTransfer(this); } public init(preView: boolean = false) { this.clear(); this.status = TransferStatus.WAITING; this.preView = preView; return this; } public clear() { this.offset = 0; this.totalSize = 0; this.fileBuffer = null; this.fileData = null; this.transferredSize = 0; this.lastTransferredSize = 0; this.startTime = 0; this.pausePromise = undefined; this.pauseResolve = undefined; this.aborted = false; } // 设置进度回调 public onProgress(callback: (transfer: FileTransfer) => void) { this.onProgressCallback = callback; } public currentProgress(): TransferProgress { return this.getProgress(); } // 获取当前进度 private getProgress(): TransferProgress { const now = Date.now(); const timeElapsed = (now - this.startTime) / 1000; // 转换为秒 const speed = timeElapsed > 0 ? (this.transferredSize - this.lastTransferredSize) / timeElapsed : 0; let totalSize = this.fileData?.chunkData?.totalSize || this.file.size; const progress: TransferProgress = { transferredSize: this.transferredSize, totalSize: totalSize, speed, status: this.status, percent: (this.transferredSize / totalSize) * 100, costTime: timeElapsed, updateTime: now, }; // 更新上次传输大小和开始时间 this.lastTransferredSize = this.transferredSize; this.startTime = now; return progress; } public fileData: FileData; public offset: number = 0; public totalSize: number = 0; private fileBuffer: ArrayBuffer | null = null; private async waitForWritable(targetBufferedAmount = MAX_BUFFERED_AMOUNT) { const dc = this.conn.dataChannel; if (!dc) { await new Promise((resolve) => setTimeout(resolve, 10)); return; } const getPeerBufferSize = () => ((this.conn as unknown as { bufferSize?: number }).bufferSize ?? 0); while ( !this.aborted && (dc.bufferedAmount > targetBufferedAmount || getPeerBufferSize() > 0) ) { if (!this.conn.open || dc.readyState !== "open") { throw new Error("WebRTC 数据通道已关闭"); } await new Promise((resolve) => setTimeout(resolve, 10)); } } // 发送文件 (流水线模式 - 受限深度,防止撑爆SCTP缓冲区) public async sendFile(savePath: string = ""): Promise { try { if (this.status == TransferStatus.WAITING) { this.startTime = Date.now(); this.fileData = (await this.file.getFile()) as FileData; this.totalSize = this.fileData.size; this.fileBuffer = this.fileData.buffer; this.fileData.preView = this.preView; this.fileData.savePath = savePath; this.fileData.buffer = null; // 移除完整buffer,只传分片 this.addTask(new TransferTask(this.fileData, this.file)); this.status = TransferStatus.SENDING; this.updateProgress(); } const dc = this.conn.dataChannel; let sentSinceDrain = 0; // 流水线发送: 每轮最多发 PIPELINE_DEPTH 个分片,然后等待排空 while (this.offset < this.totalSize && !this.aborted) { if (this.pausePromise) { this.status = TransferStatus.PAUSED; this.updateProgress(); await this.pausePromise; this.status = TransferStatus.SENDING; } const end = Math.min(this.offset + this.chunkSize, this.totalSize); const chunk = this.fileBuffer.slice(this.offset, end); this.fileData.chunkData = { offset: this.offset, totalSize: this.totalSize, buffer: chunk, }; await this.waitForWritable(); await peer.send( { type: MessageType.push_file_chunk, data: this.fileData, }, this.conn, true, ); this.offset = end; this.transferredSize = this.offset; this.updateProgress(); sentSinceDrain++; // 背压控制: 达到深度限制或缓冲过大时,等待排空 if ( sentSinceDrain >= PIPELINE_DEPTH || (dc && dc.bufferedAmount > MAX_BUFFERED_AMOUNT) ) { sentSinceDrain = 0; await this.waitForWritable(DRAIN_THRESHOLD); } } if (!this.aborted) { // 仅等待最终完成确认 await peer.send( { type: MessageType.push_file_complete, data: this.fileData, }, this.conn, ); this.status = TransferStatus.COMPLETED; this.updateProgress(); return true; } return false; } catch (error) { this.status = TransferStatus.ERROR; this.updateProgress(); throw error; } } // 接收文件 public async receiveFile( fData: FileData, preView: boolean = false, ): Promise { try { let task = this.getTask(fData); if (!task) { this.addTask(new TransferTask(fData, this.file)); } else { task.updateFileData(fData, TransferStatus.RECEIVING); } this.status = TransferStatus.RECEIVING; this.transferredSize = fData.chunkData.offset; this.startTime = Date.now(); this.totalSize = fData.chunkData.totalSize; if ( fData.chunkData.totalSize <= fData.chunkData.buffer.byteLength + fData.chunkData.offset ) { this.status = TransferStatus.COMPLETED; } if (preView) { await this.file.addPreviewCacheBuffer( fData.chunkData.buffer, fData.chunkData.offset, fData.chunkData.totalSize, ); } else { const path = fData.savePath + "/" + fData.name; await this.file.createFile( path, fData.chunkData.buffer, fData.chunkData.offset, ); // if (this.status == TransferStatus.COMPLETED) { // await this.file.renameFile(path, fData.savePath + '/' + fData.name); // } } this.updateProgress(fData); return; } catch (error) { this.status = TransferStatus.ERROR; this.updateProgress(fData); throw error; } } // 暂停传输 public pause() { if ( this.status === TransferStatus.SENDING || this.status === TransferStatus.RECEIVING ) { this.pausePromise = new Promise((resolve) => { this.pauseResolve = resolve; }); } } // 恢复传输 public resume() { if (this.pauseResolve) { this.pauseResolve(); this.pausePromise = undefined; this.pauseResolve = undefined; } } // 取消传输 public abort() { this.aborted = true; this.resume(); // 恢复暂停的传输以便能够正确退出 } // 更新进度 private updateProgress(fData: FileData = this.fileData) { this.updateTask(fData); if (this.onProgressCallback) { this.onProgressCallback(this); } } private tasks: TransferTask[] = []; //记录传输任务 public addTask(task: TransferTask) { this.tasks.push(task); this.updateProgress(task.fileData); return task; } public getTasks() { this.tasks.forEach((task) => task.updateProgress()); return this.tasks; } public updateTask(fData: FileData) { const task = this.tasks.find((task) => task.fileData.path === fData.path); task.updateFileData(fData, this.status); } public getTask(fData: FileData) { return this.tasks.find((task) => task.fileData.path === fData.path); } //清除已完成任务 public clearCompletedTasks() { this.tasks = this.tasks.filter( (t) => t.status !== TransferStatus.COMPLETED, ); } } class FileTransferMgr { private fileTransfers: Map = new Map(); public addFileTransfer(transfer: FileTransfer) { transfer.onProgress(this.notifyTransferChanged.bind(this)); this.fileTransfers.set(transfer.getFile().path, transfer); } public getFileTransfer(path: string) { return this.fileTransfers.get(path); } public getAllFileTransfers() { return Array.from(this.fileTransfers.values()); } // 传输进度变化回调 private onTransferChangedHandler: ((transfer: FileTransfer) => void)[] = []; public onTransferChanged(handler: (transfer: FileTransfer) => void) { this.onTransferChangedHandler.push(handler); } public removeTransferChangedHandler( handler: (transfer: FileTransfer) => void, ) { this.onTransferChangedHandler = this.onTransferChangedHandler.filter( (h) => h !== handler, ); } public notifyTransferChanged(transfer: FileTransfer) { this.onTransferChangedHandler.forEach((handler) => handler(transfer)); } } export class TransferTask { //传输文件数据 fileData: FileData; //接收载体目录 file: FileInfo; //开始时间 startTime: number = 0; //传输状态 status: TransferStatus = TransferStatus.WAITING; //传输进度 progress: TransferProgress = { transferredSize: 0, totalSize: 0, speed: 0, status: TransferStatus.WAITING, percent: 0, costTime: 0, updateTime: 0, }; constructor(fileData: FileData, file: FileInfo) { this.fileData = fileData; this.file = file; this.startTime = Date.now(); } public updateFileData( fileData: FileData, status: TransferStatus = TransferStatus.SENDING, ) { this.fileData = fileData; this.status = status; this.updateProgress(); } public updateProgress(): TransferProgress { if (this.status == TransferStatus.COMPLETED) { this.progress.updateTime = Date.now(); return this.progress; } const totalSize = this.fileData.chunkData?.totalSize || this.fileData.size; const transferredSize = this.fileData.chunkData?.offset || this.fileData.size; const speed = this.fileData.chunkData ? ((transferredSize + this.fileData.chunkData.buffer.byteLength) / (Date.now() - this.startTime)) * 1000 : (this.fileData.size / (Date.now() - this.startTime)) * 1000; const percent = this.fileData.chunkData ? ((transferredSize + this.fileData.chunkData.buffer.byteLength) / totalSize) * 100 : (transferredSize / totalSize) * 100; this.status = transferredSize >= totalSize ? TransferStatus.COMPLETED : this.status; this.progress = { transferredSize: transferredSize, totalSize: totalSize, speed: speed == Infinity ? totalSize : speed, status: this.status, percent: percent, costTime: Date.now() - this.startTime, updateTime: Date.now(), }; return this.progress; } } export const fileTransferMgrInstance = new FileTransferMgr();