Vue 3 中用组合式函数和 Shared Worker 实现后台分片上传(带哈希计算)

  • Vue 3 中用组合式函数和 Shared Worker 实现后台分片上传(带哈希计算)已关闭评论
  • 79 次浏览
  • A+
所属分类:Web前端
摘要

最近项目需求里有个文件上传功能,而客户需求里的文件基本上是比较大的,基本上得有 1 GiB 以上的大小,而上传大文件尤其是读大文件,可能会造成卡 UI 或者说点不动的问题。而用后台的 Worker 去实现是一个比较不错的解决办法。


01. 背景

最近项目需求里有个文件上传功能,而客户需求里的文件基本上是比较大的,基本上得有 1 GiB 以上的大小,而上传大文件尤其是读大文件,可能会造成卡 UI 或者说点不动的问题。而用后台的 Worker 去实现是一个比较不错的解决办法。

02. 原理讲解

02.01. Shared Worker

Shared Worker 的好处是可以从几个浏览上下文中访问,例如几个窗口、iframe 或其他 worker。这样我们可以保证全局的页面上传任务都在我们的控制之下,甚至可以防止重复提交等功能。

02.02. 组合式函数

组合式函数的好处是在 Vue 3 是可以在任何 *.vue 文件中使用,并且是响应式方法,可以侦听 pinia 内 token 等的变化,传递给 Worker

02.03 简单流程设计

flowchart TB id1[用户选择文件] --> id2[创建上传任务] id2 --> id3[任务推送到 Worker] id3 --> id4[上传到服务器] id4 --> id5[Worker 返回任务状态] id5 --> id6[组合式函数拦截状态放到 Map 里]

03. 代码

upload-worker.ts 代码

import { sha256 } from '@noble/hashes/sha256'; import { bytesToHex as toHex } from '@noble/hashes/utils'; interface SharedWorkerGlobalScope {   onconnect: (event: MessageEvent<any>) => void; } const _self: SharedWorkerGlobalScope = self as any; /**  * 分片大小  */ const pieceSize = 1024 * 1024; /**  * 消息参数  */ interface MessageArg<T> {   /**    * 函数名    */   func: string;   /**    * 参数    */   arg: T; } /**  * 上传任务信息  */ interface UploadTaskInfo {   /**    * 文件名    */   fileName: string;   /**    * 上传路径    */   uploadPath: string;   /**    * 任务 id    */   id: string;   /**    * 文件大小    */   size: number;   /**    * 上传进度    */   progress: number;   /**    * 上传速度    */   speed?: number;   /**    * 任务状态    */   status: 'uploading' | 'paused' | 'canceled' | 'done' | 'error' | 'waiting';   /**    * 开始时间    */   startTime?: Date;   /**    * 结束时间    */   endTime?: Date;   /**    * 错误信息    */   errorMessage?: string; } /**  * 上传任务  */ interface UploadTask extends UploadTaskInfo {   file: File;   pieces: Array<boolean>;   abort?: AbortController; } /**  * 任务/哈希值映射  */ const hashs = new Map(); /**  * 上传任务列表  */ const uploadTasks: Array<UploadTask> = []; /**  * 状态接收器  */ const statusReceivers = new Map<string, MessagePort>(); /**  * token 仓库  */ const tokenStore = {   /**    * token    */   BearerToken: '', }; /**  * 返回上传状态  * @param task 上传任务  */ const updateStatus = (task: UploadTaskInfo) => {   const taskInfo: UploadTaskInfo = {     fileName: task.fileName,     uploadPath: task.uploadPath,     id: task.id,     size: task.size,     progress: task.progress,     speed: task.speed,     status: task.status,     startTime: task.startTime,     endTime: task.endTime,     errorMessage: task.errorMessage,   };   statusReceivers.forEach((item) => {     item.postMessage(taskInfo);   }); }; /**  * 运行上传任务  * @param task 上传任务  */ const runUpload = async (task: UploadTask) => {   task.status = 'uploading';   const hash = hashs.get(task.id) || sha256.create();   hashs.set(task.id, hash);   let retryCount = 0;   const abort = new AbortController();   task.abort = abort;   while (task.status === 'uploading') {     const startTime = Date.now();     const index = task.pieces.findIndex((item) => !item);     if (index === -1) {       try {         const response: { code: number; message: string } = await fetch(           '/api/File/Upload',           {             method: 'PUT',             headers: {               Authorization: tokenStore.BearerToken,               'Content-Type': 'application/json',             },             body: JSON.stringify({               id: task.id,               fileHash: toHex(hash.digest()),               filePath: task.uploadPath,             }),           }         ).then((res) => res.json());         if (response.code !== 200) {           throw new Error(response.message);         }         task.status = 'done';         task.endTime = new Date();         updateStatus(task);       } catch (e: any) {         task.status = 'error';         task.errorMessage = e.toString();         task.endTime = new Date();         deleteUpload(task.id);         updateStatus(task);       }       break;     }     const start = index * pieceSize;     const end = start + pieceSize >= task.size ? task.size : start + pieceSize;     const buffer = task.file.slice(index * pieceSize, end);     hash.update(new Uint8Array(await buffer.arrayBuffer()));     const form = new FormData();     form.append('file', buffer);     let isTimeout = false;     try {       const timer = setTimeout(() => {         isTimeout = true;         abort.abort();       }, 8000);       const response: { code: number; message: string } = await fetch(         `/api/File/Upload?id=${task.id}&offset=${start}`,         {           method: 'POST',           body: form,           headers: {             Authorization: tokenStore.BearerToken,           },           signal: abort.signal,         }       ).then((res) => res.json());       clearTimeout(timer);       if (response.code !== 200) {         throw new Error(response.message);       }       task.pieces[index] = true;       task.progress =         task.pieces.filter((item) => item).length / task.pieces.length;       task.speed = (pieceSize / (Date.now() - startTime)) * 1000;       updateStatus(task);     } catch (e: any) {       retryCount++;       if (retryCount > 3) {         task.status = 'error';         if (isTimeout) {           task.errorMessage = 'UploadTimeout';         } else {           task.errorMessage = e.toString();         }         task.endTime = new Date();         deleteUpload(task.id);         updateStatus(task);       }     }     runNextUpload();   } }; /**  * 运行下一个上传任务  */ const runNextUpload = async () => {   if (uploadTasks.filter((item) => item.status === 'uploading').length > 3) {     return;   }   const task = uploadTasks.find((item) => item.status === 'waiting');   if (task) {     await runUpload(task);   } }; /**  * 排队上传  * @param e 消息事件  */ const queueUpload = async (   e: MessageEvent<     MessageArg<{       id: string;       file: File;       uploadPath: string;     }>   > ) => {   uploadTasks.push({     file: e.data.arg.file,     fileName: e.data.arg.file.name,     id: e.data.arg.id,     uploadPath: e.data.arg.uploadPath,     size: e.data.arg.file.size,     progress: 0,     speed: 0,     status: 'waiting',     pieces: new Array(Math.ceil(e.data.arg.file.size / pieceSize)).fill(false),     errorMessage: undefined,   });   updateStatus(uploadTasks[uploadTasks.length - 1]);   await runNextUpload(); }; /**  * 注册状态接收器  * @param e 消息事件  * @param sender 发送者  */ const registerStatusReceiver = (   e: MessageEvent<MessageArg<string>>,   sender?: MessagePort ) => {   if (sender) statusReceivers.set(e.data.arg, sender); }; /**  * 注销状态接收器  * @param e 消息事件  */ const unregisterStatusReceiver = (e: MessageEvent<MessageArg<string>>) => {   statusReceivers.delete(e.data.arg); }; /**  * 更新 token  * @param e 消息事件  */ const updateToken = (e: MessageEvent<MessageArg<string>>) => {   tokenStore.BearerToken = 'Bearer ' + e.data.arg; }; /**  * 暂停上传  * @param e 消息事件  */ const pauseUpload = (e: MessageEvent<MessageArg<string>>) => {   const task = uploadTasks.find((item) => item.id === e.data.arg);   if (task) {     task.status = 'paused';     if (task.abort) {       task.abort.abort();     }     updateStatus(task);   } }; /**  * 取消上传  * @param e 消息事件  */ const cancelUpload = (e: MessageEvent<MessageArg<string>>) => {   const task = uploadTasks.find((item) => item.id === e.data.arg);   if (task) {     task.status = 'canceled';     if (task.abort) {       task.abort.abort();     }     deleteUpload(task.id);     updateStatus(task);   } }; /**  * 删除上传  * @param id 任务 id  */ const deleteUpload = async (id: string) => {   uploadTasks.splice(     uploadTasks.findIndex((item) => item.id === id),     1   );   hashs.delete(id);   await fetch(`/api/File/Upload?id=${id}`, {     method: 'DELETE',     headers: {       Authorization: tokenStore.BearerToken,     },   }).then((res) => res.json()); }; /**  * 消息路由  */ const messageRoute = new Map<   string,   (e: MessageEvent<MessageArg<any>>, sender?: MessagePort) => void >([   ['queueUpload', queueUpload],   ['registerStatusReceiver', registerStatusReceiver],   ['updateToken', updateToken],   ['pauseUpload', pauseUpload],   ['cancelUpload', cancelUpload],   ['unregisterStatusReceiver', unregisterStatusReceiver], ]); // 监听连接 _self.onconnect = (e) => {   const port = e.ports[0];   port.onmessage = async (e) => {     // 调用函数     const func = messageRoute.get(e.data.func);     if (func) {       func(e, port);     }   };   port.start(); };  

upload-service.ts 代码

import UploadWorker from './upload-worker?sharedworker'; import { onUnmounted, ref, watch } from 'vue'; import { storeToRefs } from 'pinia'; import { useAuthStore } from 'src/stores/auth'; /**  * 上传任务信息  */ interface UploadTaskInfo {   /**    * 文件名    */   fileName: string;   /**    * 上传路径    */   uploadPath: string;   /**    * 任务 id    */   id: string;   /**    * 文件大小    */   size: number;   /**    * 上传进度    */   progress: number;   /**    * 上传速度    */   speed?: number;   /**    * 任务状态    */   status: 'uploading' | 'paused' | 'canceled' | 'done' | 'error' | 'waiting';   /**    * 开始时间    */   startTime?: Date;   /**    * 结束时间    */   endTime?: Date;   /**    * 错误信息    */   errorMessage?: string; } /**  * 上传服务  */ export const useUploadService = () => {   const store = storeToRefs(useAuthStore());   // 创建共享 worker   const worker = new UploadWorker();   /**    * 上传任务列表    */   const uploadTasks = ref<Map<string, UploadTaskInfo>>(     new Map<string, UploadTaskInfo>()   );   // 是否已注册状态接收器   const isRegistered = ref(false);   // 服务 id   const serviceId = crypto.randomUUID();   // 监听上传任务列表变化(只有在注册状态接收器后才会收到消息)   worker.port.onmessage = (e: MessageEvent<UploadTaskInfo>) => {     uploadTasks.value.set(e.data.id, e.data);   };   // 更新 token   worker.port.postMessage({     func: 'updateToken',     arg: store.token.value,   });   watch(store.token, (token) => {     worker.port.postMessage({       func: 'updateToken',       arg: token,     });   });   /**    * 排队上传    * @param file 文件    * @param uploadPath 上传路径    */   const queueUpload = (file: File, uploadPath: string) => {     worker.port.postMessage({       func: 'queueUpload',       arg: {         id: crypto.randomUUID(),         file: file,         uploadPath: uploadPath,       },     });   };   /**    * 暂停上传    * @param id 任务 id    */   const pauseUpload = (id: string) => {     worker.port.postMessage({       func: 'pauseUpload',       arg: id,     });   };   /**    * 取消上传    * @param id 任务 id    */   const cancelUpload = (id: string) => {     worker.port.postMessage({       func: 'cancelUpload',       arg: id,     });   };   /**    * 注册状态接收器    */   const registerStatusReceiver = () => {     worker.port.postMessage({       func: 'registerStatusReceiver',       arg: serviceId,     });     isRegistered.value = true;   };   /**    * 注销状态接收器    */   const unregisterStatusReceiver = () => {     worker.port.postMessage({       func: 'unregisterStatusReceiver',       arg: serviceId,     });     isRegistered.value = false;   };   onUnmounted(() => {     unregisterStatusReceiver();     worker.port.close();   });   return {     uploadTasks,     queueUpload,     pauseUpload,     cancelUpload,     registerStatusReceiver,     unregisterStatusReceiver,   }; };  

04. 用法

// 引入组合式函数 const uploadService = useUploadService(); // 注册状态接收器 uploadService.registerStatusReceiver(); // 表单绑定上传方法 const upload = (file: File, filePath: string) => {   uploadService.queueUpload(file, filePath); } // 监听上传进度,当然也可以直接展示在界面,毕竟是 Ref watch(uploadService.uploadTasks, console.log);