p2p-explorer-web/src/pages/file/utils/fileTransfer.ts

470 lines
14 KiB
TypeScript

import type { FileData, FileInfo } from "./fileMgr";
import { MessageType, peer } from "./peer";
import type { DataConnection } from "peerjs";
// 传输状态枚举
export enum TransferStatus {
WAITING = "waiting", // 等待传输
SENDING = "sending", // 发送中
RECEIVING = "receiving", // 接收中
PAUSED = "paused", // 已暂停
COMPLETED = "completed", // 已完成
ERROR = "error", // 错误
}
// 传输进度接口
export interface TransferProgress {
transferredSize: number; // 已传输大小
totalSize: number; // 总大小
speed: number; // 传输速度 (bytes/s)
status: TransferStatus; // 传输状态
percent: number; // 进度百分比
costTime: number; // 传输时间
updateTime: number; // 更新时间
}
// 分片配置
const CHUNK_SIZE = 64 * 1024; // 64KB, 交给 PeerJS 再切成 SCTP 安全包
const MAX_CHUNK_SIZE = 64 * 1024;
//多大的文件需要分片
export const NEED_CHUNK_FILE_SIZE = 200 * 1024; // 200KB
export const NEED_CHUNK_FILE_SIZE_PREVIEW = 50 * 1024 * 1024; // 50MB
// 流水线流控配置
const PIPELINE_DEPTH = 4; // 最多4个应用层分片在途(约256KB)
const MAX_BUFFERED_AMOUNT = 512 * 1024; // 512KB - 触发流控的缓冲上限
const DRAIN_THRESHOLD = 128 * 1024; // 128KB - 恢复发送的缓冲下限
export class FileTransfer {
public conn: DataConnection;
private file: FileInfo;
private chunkSize: number;
private transferredSize: number = 0;
private lastTransferredSize: number = 0;
private startTime: number = 0;
private status: TransferStatus = TransferStatus.WAITING;
private pausePromise?: Promise<void>;
private pauseResolve?: () => void;
private aborted: boolean = false;
private preView: boolean = false;
// 进度回调
private onProgressCallback?: (transfer: FileTransfer) => void;
public getFile(): FileInfo {
return this.file;
}
constructor(
conn: DataConnection,
file: FileInfo,
chunkSize: number = CHUNK_SIZE,
) {
this.conn = conn;
this.file = file;
this.chunkSize = Math.min(chunkSize, MAX_CHUNK_SIZE);
file.addTransfer(this);
fileTransferMgrInstance.addFileTransfer(this);
}
public init(preView: boolean = false) {
this.clear();
this.status = TransferStatus.WAITING;
this.preView = preView;
return this;
}
public clear() {
this.offset = 0;
this.totalSize = 0;
this.fileBuffer = null;
this.fileData = null;
this.transferredSize = 0;
this.lastTransferredSize = 0;
this.startTime = 0;
this.pausePromise = undefined;
this.pauseResolve = undefined;
this.aborted = false;
this.closeReceiveWritable().catch((error) => {
console.error("close receive writable error", error);
});
this.receiveQueue = Promise.resolve();
}
// 设置进度回调
public onProgress(callback: (transfer: FileTransfer) => void) {
this.onProgressCallback = callback;
}
public currentProgress(): TransferProgress {
return this.getProgress();
}
// 获取当前进度
private getProgress(): TransferProgress {
const now = Date.now();
const timeElapsed = (now - this.startTime) / 1000; // 转换为秒
const speed =
timeElapsed > 0
? (this.transferredSize - this.lastTransferredSize) / timeElapsed
: 0;
let totalSize = this.fileData?.chunkData?.totalSize || this.file.size;
const progress: TransferProgress = {
transferredSize: this.transferredSize,
totalSize: totalSize,
speed,
status: this.status,
percent: (this.transferredSize / totalSize) * 100,
costTime: timeElapsed,
updateTime: now,
};
// 更新上次传输大小和开始时间
this.lastTransferredSize = this.transferredSize;
this.startTime = now;
return progress;
}
public fileData: FileData;
public offset: number = 0;
public totalSize: number = 0;
private fileBuffer: ArrayBuffer | null = null;
private receiveQueue: Promise<void> = Promise.resolve();
private receiveWritable: FileSystemWritableFileStream | null = null;
private receiveWritablePath: string = "";
private async waitForWritable(targetBufferedAmount = MAX_BUFFERED_AMOUNT) {
const dc = this.conn.dataChannel;
if (!dc) {
await new Promise((resolve) => setTimeout(resolve, 10));
return;
}
const getPeerBufferSize = () =>
((this.conn as unknown as { bufferSize?: number }).bufferSize ?? 0);
while (
!this.aborted &&
(dc.bufferedAmount > targetBufferedAmount || getPeerBufferSize() > 0)
) {
if (!this.conn.open || dc.readyState !== "open") {
throw new Error("WebRTC 数据通道已关闭");
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
}
private async getReceiveWritable(path: string) {
if (this.receiveWritable && this.receiveWritablePath === path) {
return this.receiveWritable;
}
if (this.receiveWritable) {
await this.receiveWritable.close();
this.receiveWritable = null;
}
let dir = this.file.fileDirHandler as FileSystemDirectoryHandle;
let name = path;
if (name.split("/").length > 1) {
const dirPath = name.split("/").slice(0, -1).join("/");
name = name.split("/").slice(-1).join("");
dir = await this.file.createPath(dirPath);
}
const fileHandle = await dir.getFileHandle(name, { create: true });
this.receiveWritable = await fileHandle.createWritable();
this.receiveWritablePath = path;
return this.receiveWritable;
}
private async closeReceiveWritable() {
if (!this.receiveWritable) return;
const writable = this.receiveWritable;
this.receiveWritable = null;
this.receiveWritablePath = "";
await writable.close();
}
// 发送文件 (流水线模式 - 受限深度,防止撑爆SCTP缓冲区)
public async sendFile(savePath: string = ""): Promise<boolean> {
try {
if (this.status == TransferStatus.WAITING) {
this.startTime = Date.now();
this.fileData = (await this.file.getFile()) as FileData;
this.totalSize = this.fileData.size;
this.fileBuffer = this.fileData.buffer;
this.fileData.preView = this.preView;
this.fileData.savePath = savePath;
this.fileData.buffer = null; // 移除完整buffer,只传分片
this.addTask(new TransferTask(this.fileData, this.file));
this.status = TransferStatus.SENDING;
this.updateProgress();
}
const dc = this.conn.dataChannel;
let sentSinceDrain = 0;
// 流水线发送: 每轮最多发 PIPELINE_DEPTH 个分片,然后等待排空
while (this.offset < this.totalSize && !this.aborted) {
if (this.pausePromise) {
this.status = TransferStatus.PAUSED;
this.updateProgress();
await this.pausePromise;
this.status = TransferStatus.SENDING;
}
const end = Math.min(this.offset + this.chunkSize, this.totalSize);
const chunk = this.fileBuffer.slice(this.offset, end);
this.fileData.chunkData = {
offset: this.offset,
totalSize: this.totalSize,
buffer: chunk,
};
await this.waitForWritable();
await peer.send(
{
type: MessageType.push_file_chunk,
data: this.fileData,
},
this.conn,
);
this.offset = end;
this.transferredSize = this.offset;
this.updateProgress();
sentSinceDrain++;
// 背压控制: 达到深度限制或缓冲过大时,等待排空
if (
sentSinceDrain >= PIPELINE_DEPTH ||
(dc && dc.bufferedAmount > MAX_BUFFERED_AMOUNT)
) {
sentSinceDrain = 0;
await this.waitForWritable(DRAIN_THRESHOLD);
}
}
if (!this.aborted) {
// 仅等待最终完成确认
await peer.send(
{
type: MessageType.push_file_complete,
data: this.fileData,
},
this.conn,
);
this.status = TransferStatus.COMPLETED;
this.updateProgress();
return true;
}
return false;
} catch (error) {
this.status = TransferStatus.ERROR;
this.updateProgress();
throw error;
}
}
// 接收文件
public async receiveFile(
fData: FileData,
preView: boolean = false,
): Promise<void> {
this.receiveQueue = this.receiveQueue.then(
() => this.receiveFileChunk(fData, preView),
() => this.receiveFileChunk(fData, preView),
);
return this.receiveQueue;
}
private async receiveFileChunk(
fData: FileData,
preView: boolean = false,
): Promise<void> {
try {
let task = this.getTask(fData);
if (!task) {
this.addTask(new TransferTask(fData, this.file));
} else {
task.updateFileData(fData, TransferStatus.RECEIVING);
}
this.status = TransferStatus.RECEIVING;
this.transferredSize = fData.chunkData.offset;
this.startTime = Date.now();
this.totalSize = fData.chunkData.totalSize;
if (
fData.chunkData.totalSize <=
fData.chunkData.buffer.byteLength + fData.chunkData.offset
) {
this.status = TransferStatus.COMPLETED;
}
if (preView) {
await this.file.addPreviewCacheBuffer(
fData.chunkData.buffer,
fData.chunkData.offset,
fData.chunkData.totalSize,
);
} else {
const path = fData.savePath + "/" + fData.name;
const writable = await this.getReceiveWritable(path);
await writable.write({
type: "write",
position: fData.chunkData.offset,
data: fData.chunkData.buffer,
});
if (this.status === TransferStatus.COMPLETED) {
await this.closeReceiveWritable();
}
// if (this.status == TransferStatus.COMPLETED) {
// await this.file.renameFile(path, fData.savePath + '/' + fData.name);
// }
}
this.updateProgress(fData);
return;
} catch (error) {
this.status = TransferStatus.ERROR;
this.updateProgress(fData);
throw error;
}
}
// 暂停传输
public pause() {
if (
this.status === TransferStatus.SENDING ||
this.status === TransferStatus.RECEIVING
) {
this.pausePromise = new Promise((resolve) => {
this.pauseResolve = resolve;
});
}
}
// 恢复传输
public resume() {
if (this.pauseResolve) {
this.pauseResolve();
this.pausePromise = undefined;
this.pauseResolve = undefined;
}
}
// 取消传输
public abort() {
this.aborted = true;
this.closeReceiveWritable().catch((error) => {
console.error("close receive writable error", error);
});
this.resume(); // 恢复暂停的传输以便能够正确退出
}
// 更新进度
private updateProgress(fData: FileData = this.fileData) {
this.updateTask(fData);
if (this.onProgressCallback) {
this.onProgressCallback(this);
}
}
private tasks: TransferTask[] = [];
//记录传输任务
public addTask(task: TransferTask) {
this.tasks.push(task);
this.updateProgress(task.fileData);
return task;
}
public getTasks() {
this.tasks.forEach((task) => task.updateProgress());
return this.tasks;
}
public updateTask(fData: FileData) {
const task = this.tasks.find((task) => task.fileData.path === fData.path);
task.updateFileData(fData, this.status);
}
public getTask(fData: FileData) {
return this.tasks.find((task) => task.fileData.path === fData.path);
}
//清除已完成任务
public clearCompletedTasks() {
this.tasks = this.tasks.filter(
(t) => t.status !== TransferStatus.COMPLETED,
);
}
}
class FileTransferMgr {
private fileTransfers: Map<string, FileTransfer> = new Map();
public addFileTransfer(transfer: FileTransfer) {
transfer.onProgress(this.notifyTransferChanged.bind(this));
this.fileTransfers.set(transfer.getFile().path, transfer);
}
public getFileTransfer(path: string) {
return this.fileTransfers.get(path);
}
public getAllFileTransfers() {
return Array.from(this.fileTransfers.values());
}
// 传输进度变化回调
private onTransferChangedHandler: ((transfer: FileTransfer) => void)[] = [];
public onTransferChanged(handler: (transfer: FileTransfer) => void) {
this.onTransferChangedHandler.push(handler);
}
public removeTransferChangedHandler(
handler: (transfer: FileTransfer) => void,
) {
this.onTransferChangedHandler = this.onTransferChangedHandler.filter(
(h) => h !== handler,
);
}
public notifyTransferChanged(transfer: FileTransfer) {
this.onTransferChangedHandler.forEach((handler) => handler(transfer));
}
}
export class TransferTask {
//传输文件数据
fileData: FileData;
//接收载体目录
file: FileInfo;
//开始时间
startTime: number = 0;
//传输状态
status: TransferStatus = TransferStatus.WAITING;
//传输进度
progress: TransferProgress = {
transferredSize: 0,
totalSize: 0,
speed: 0,
status: TransferStatus.WAITING,
percent: 0,
costTime: 0,
updateTime: 0,
};
constructor(fileData: FileData, file: FileInfo) {
this.fileData = fileData;
this.file = file;
this.startTime = Date.now();
}
public updateFileData(
fileData: FileData,
status: TransferStatus = TransferStatus.SENDING,
) {
this.fileData = fileData;
this.status = status;
this.updateProgress();
}
public updateProgress(): TransferProgress {
const totalSize = this.fileData.chunkData?.totalSize || this.fileData.size;
const transferredSize =
this.fileData.chunkData
? this.fileData.chunkData.offset + this.fileData.chunkData.buffer.byteLength
: this.fileData.size;
const speed = this.fileData.chunkData
? (transferredSize / (Date.now() - this.startTime)) * 1000
: (this.fileData.size / (Date.now() - this.startTime)) * 1000;
const percent = this.fileData.chunkData
? (transferredSize / totalSize) * 100
: (transferredSize / totalSize) * 100;
this.status =
transferredSize >= totalSize ? TransferStatus.COMPLETED : this.status;
this.progress = {
transferredSize: transferredSize,
totalSize: totalSize,
speed: speed == Infinity ? totalSize : speed,
status: this.status,
percent: percent,
costTime: Date.now() - this.startTime,
updateTime: Date.now(),
};
return this.progress;
}
}
export const fileTransferMgrInstance = new FileTransferMgr();