1/* 2 * Copyright (C) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16class DbThread { 17 busy: boolean = false; 18 id: number = -1; 19 //@ts-ignore 20 taskMap: unknow = {}; 21 worker?: Worker; 22 traceId: string; 23 24 constructor(worker: Worker, traceId: string) { 25 this.worker = worker; 26 this.traceId = traceId; 27 } 28 29 uuid(): string { 30 // @ts-ignore 31 return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c: unknow) => 32 (c ^ (crypto.getRandomValues(new Uint8Array(1))[0] & (15 >> (c / 4)))).toString(16) 33 ); 34 } 35 36 //@ts-ignore 37 queryFunc(name: string, sql: string, args: unknow, handler: Function, action: string | null): void { 38 this.busy = true; 39 let id = this.uuid(); 40 this.taskMap[id] = handler; 41 let msg = { 42 id: id, 43 name: name, 44 action: action || 'exec', 45 sql: sql, 46 params: args, 47 }; 48 this.worker?.postMessage(msg); 49 } 50 51 //@ts-ignore 52 queryProto(name: number, args: unknow, handler: Function): void { 53 this.busy = true; 54 let id = this.uuid(); 55 this.taskMap[id] = handler; 56 let msg = { 57 id: id, 58 name: name, 59 action: 'exec-proto', 60 params: args, 61 }; 62 this.worker?.postMessage(msg); 63 } 64 65 cutFileByRange( 66 leftTs: number, 67 rightTs: number, 68 handler: (status: boolean, msg: string, splitBuffer?: ArrayBuffer) => void 69 ): void { 70 this.busy = true; 71 let id = this.uuid(); 72 //@ts-ignore 73 this.taskMap[id] = (res: unknow): void => { 74 setThreadPoolTraceBuffer(this.traceId, res.buffer); 75 if (res.cutStatus) { 76 handler(res.cutStatus, res.msg, res.cutBuffer); 77 } else { 78 handler(res.cutStatus, res.msg); 79 } 80 }; 81 caches.match(getThreadPoolTraceBufferCacheKey(this.traceId)).then((resData) => { 82 if (resData) { 83 resData.arrayBuffer().then((buffer) => { 84 this.worker!.postMessage( 85 { 86 id: id, 87 action: 'cut-file', 88 leftTs: leftTs, 89 rightTs: rightTs, 90 buffer: buffer!, 91 }, 92 [buffer!] 93 ); 94 }); 95 } 96 }); 97 } 98 99 dbOpen = async ( 100 parseConfig: string, 101 sdkWasmConfig?: string, 102 buffer?: ArrayBuffer 103 ): Promise<{ 104 status: boolean; 105 msg: string; 106 buffer: ArrayBuffer; 107 //@ts-ignore 108 sdkConfigMap: unknow; 109 fileKey: string; 110 }> => { 111 //@ts-ignore 112 return new Promise<unknow>((resolve, reject) => { 113 let id = this.uuid(); 114 //@ts-ignore 115 this.taskMap[id] = (res: unknow): unknow => { 116 if (res.init) { 117 resolve({ 118 status: res.init, 119 msg: res.msg, 120 sdkConfigMap: res.configSqlMap, 121 buffer: res.buffer, 122 fileKey: res.fileKey, 123 }); 124 } else { 125 resolve({ status: res.init, msg: res.msg }); 126 } 127 }; 128 this.worker?.postMessage( 129 { 130 id: id, 131 action: 'open', 132 parseConfig: parseConfig, 133 wasmConfig: sdkWasmConfig, 134 buffer: buffer! /*Optional. An ArrayBuffer representing an SQLite Database file*/, 135 }, 136 [buffer!] 137 ); 138 }); 139 }; 140 141 resetWASM(): void { 142 this.worker?.postMessage({ 143 id: this.uuid(), 144 action: 'reset', 145 }); 146 } 147} 148 149export class DbPool { 150 sharedBuffer: ArrayBuffer | null = null; 151 fileCacheKey: string = 'null'; 152 traceId: string; 153 maxThreadNumber: number = 0; 154 works: Array<DbThread> = []; 155 progress: Function | undefined | null; 156 num = Math.floor(Math.random() * 10 + 1) + 20; 157 cutDownTimer: unknown | undefined; 158 currentWasmThread: DbThread | undefined = undefined; 159 160 constructor(traceId: string) { 161 this.traceId = traceId; 162 } 163 164 init = async (type: string, threadBuild: (() => DbThread) | undefined = undefined): Promise<void> => { 165 // wasm | server | sqlite 166 if (this.currentWasmThread) { 167 this.currentWasmThread.resetWASM(); 168 this.currentWasmThread = undefined; 169 } 170 await this.close(); 171 this.maxThreadNumber = 1; 172 for (let i = 0; i < this.maxThreadNumber; i++) { 173 let thread: DbThread | undefined; 174 if (threadBuild) { 175 thread = threadBuild(); 176 } else { 177 if (type === 'wasm') { 178 thread = new DbThread(new Worker(new URL('./TraceWorker', import.meta.url)), this.traceId); 179 } else if (type === 'server') { 180 thread = new DbThread(new Worker(new URL('./SqlLiteWorker', import.meta.url)), this.traceId); 181 } else if (type === 'sqlite') { 182 thread = new DbThread(new Worker(new URL('./SqlLiteWorker', import.meta.url)), this.traceId); 183 } 184 } 185 if (thread) { 186 this.currentWasmThread = thread; 187 thread!.worker!.onerror = (err): void => { 188 console.warn(err); 189 }; 190 thread!.worker!.onmessageerror = (err): void => { 191 console.warn(err); 192 }; 193 this.threadPostMessage(thread); 194 thread!.id = i; 195 thread!.busy = false; 196 this.works?.push(thread!); 197 } 198 } 199 }; 200 threadPostMessage(thread: DbThread): void { 201 thread!.worker!.onmessage = (event: MessageEvent): void => { 202 thread!.busy = false; 203 if (Reflect.has(thread!.taskMap, event.data.id)) { 204 if (event.data.results) { 205 let fun = thread!.taskMap[event.data.id]; 206 if (fun) { 207 fun(event.data.results, event.data.len, event.data.transfer, event.data.isEmpty); 208 } 209 Reflect.deleteProperty(thread!.taskMap, event.data.id); 210 } else if (Reflect.has(event.data, 'cutStatus')) { 211 let fun = thread!.taskMap[event.data.id]; 212 if (fun) { 213 fun(event.data); 214 } 215 } else if (Reflect.has(event.data, 'ready')) { 216 this.progress!('database opened', this.num + event.data.index); 217 this.progressTimer(this.num + event.data.index, this.progress!); 218 this.sharedBuffer = null; 219 } else if (Reflect.has(event.data, 'init')) { 220 if (this.cutDownTimer !== undefined) { 221 //@ts-ignore 222 clearInterval(this.cutDownTimer); 223 } 224 let fun = thread!.taskMap[event.data.id]; 225 if (!event.data.init && !event.data.status) { 226 if (fun) { 227 fun(['error', event.data.msg]); 228 } 229 } else { 230 this.progress!('database ready', 40); 231 if (fun) { 232 fun(event.data); 233 } 234 } 235 Reflect.deleteProperty(thread!.taskMap, event.data.id); 236 } else { 237 let fun = thread!.taskMap[event.data.id]; 238 if (fun) { 239 fun([]); 240 } 241 Reflect.deleteProperty(thread!.taskMap, event.data.id); 242 } 243 } 244 }; 245 } 246 247 initServer = async (url: string, progress: Function): Promise<{ status: boolean; msg: string }> => { 248 this.progress = progress; 249 progress('database loaded', 15); 250 this.sharedBuffer = await fetch(url).then((res) => res.arrayBuffer()); 251 progress('open database', 20); 252 for (let thread of this.works) { 253 let { status, msg } = await thread.dbOpen(''); 254 if (!status) { 255 this.sharedBuffer = null; 256 return { status, msg }; 257 } 258 } 259 return { status: true, msg: 'ok' }; 260 }; 261 initSqlite = async ( 262 buf: ArrayBuffer, 263 parseConfig: string, 264 sdkWasmConfig: string, 265 progress: Function 266 ): Promise< 267 | { 268 status: false; 269 msg: string; 270 sdkConfigMap?: undefined; 271 } 272 | { 273 status: boolean; 274 msg: string; 275 //@ts-ignore 276 sdkConfigMap: unknow; 277 } 278 > => { 279 this.progress = progress; 280 progress('database loaded', 15); 281 this.sharedBuffer = buf; 282 progress('parse database', 20); 283 let configMap; 284 for (let thread of this.works) { 285 let { status, msg, buffer, sdkConfigMap, fileKey } = await thread.dbOpen(parseConfig, sdkWasmConfig, buf); 286 if (!status) { 287 this.sharedBuffer = null; 288 return { status, msg }; 289 } else { 290 configMap = sdkConfigMap; 291 this.sharedBuffer = buffer; 292 if (fileKey !== '-1') { 293 this.fileCacheKey = fileKey; 294 } else { 295 this.fileCacheKey = `trace/${new Date().getTime()}-${buffer.byteLength}`; 296 this.saveTraceFileBuffer(this.fileCacheKey, buffer).then(); 297 } 298 } 299 } 300 return { status: true, msg: 'ok', sdkConfigMap: configMap }; 301 }; 302 303 async saveTraceFileBuffer(key: string, buffer: ArrayBuffer): Promise<void> { 304 await this.obligateFileBufferSpace(buffer.byteLength); 305 caches.open(key).then((cache) => { 306 let headers = new Headers(); 307 headers.append('Content-Length', `${buffer.byteLength}`); 308 headers.append('Content-Type', 'application/octet-stream'); 309 cache 310 .put( 311 key, 312 new Response(buffer, { 313 status: 200, 314 headers: headers, 315 }) 316 ) 317 .then(); 318 }); 319 } 320 321 /** 322 * 计算预留缓存空间,如果空间不够,则删除部分缓存 323 * @param size 324 */ 325 async obligateFileBufferSpace(size: number): Promise<void> { 326 let es = await navigator.storage.estimate(); 327 let remainderByte = (es.quota || 0) - (es.usage || 0) - 20 * 1024 * 1024; 328 if (remainderByte < size) { 329 let keys = await caches.keys(); 330 keys.sort((keyA, keyB) => { 331 if (keyA.includes('/') && keyB.includes('/')) { 332 let splitA = keyA.split('/'); 333 let splitB = keyB.split('/'); 334 let timeA = splitA[splitA.length - 1].split('-')[0]; 335 let timeB = splitB[splitB.length - 1].split('-')[0]; 336 return parseInt(timeA) - parseInt(timeB); 337 } else { 338 return 0; 339 } 340 }); 341 let needSize = size - remainderByte; 342 for (let key of keys) { 343 await caches.delete(key); 344 let keySize = parseInt(key.split('-')[1]); 345 if (keySize > needSize) { 346 return; 347 } else { 348 needSize -= keySize; 349 } 350 } 351 } 352 } 353 354 close = async (): Promise<void> => { 355 //@ts-ignore 356 clearInterval(this.cutDownTimer); 357 for (let thread of this.works) { 358 thread.worker?.terminate(); 359 } 360 this.works.length = 0; 361 }; 362 363 async reset(): Promise<void> { 364 if (this.currentWasmThread) { 365 this.currentWasmThread.resetWASM(); 366 this.currentWasmThread = undefined; 367 } 368 await this.close(); 369 } 370 371 //@ts-ignore 372 submit(name: string, sql: string, args: unknow, handler: Function, action: string | null): void { 373 let noBusyThreads = this.works.filter((it) => !it.busy); 374 let thread: DbThread; 375 if (noBusyThreads.length > 0) { 376 //取第一个空闲的线程进行任务 377 thread = noBusyThreads[0]; 378 thread.queryFunc(name, sql, args, handler, action); 379 } else { 380 // 随机插入一个线程中 381 thread = this.works[Math.floor(Math.random() * this.works.length)]; 382 thread.queryFunc(name, sql, args, handler, action); 383 } 384 } 385 386 //@ts-ignore 387 submitProto(name: number, args: unknow, handler: Function): void { 388 let noBusyThreads = this.works.filter((it) => !it.busy); 389 let thread: DbThread; 390 if (noBusyThreads.length > 0) { 391 //取第一个空闲的线程进行任务 392 thread = noBusyThreads[0]; 393 thread.queryProto(name, args, handler); 394 } else { 395 // 随机插入一个线程中 396 thread = this.works[Math.floor(Math.random() * this.works.length)]; 397 if (thread) { 398 thread.queryProto(name, args, handler); 399 } 400 } 401 } 402 403 cutFile( 404 leftTs: number, 405 rightTs: number, 406 handler: (status: boolean, msg: string, splitBuffer?: ArrayBuffer) => void 407 ): void { 408 let noBusyThreads = this.works.filter((it) => !it.busy); 409 let thread: DbThread; 410 if (noBusyThreads.length > 0) { 411 thread = noBusyThreads[0]; 412 thread.cutFileByRange(leftTs, rightTs, handler); 413 } else { 414 thread = this.works[Math.floor(Math.random() * this.works.length)]; 415 thread.cutFileByRange(leftTs, rightTs, handler); 416 } 417 } 418 419 progressTimer(num: number, progress: Function): void { 420 let currentNum = num; 421 //@ts-ignore 422 clearInterval(this.cutDownTimer); 423 this.cutDownTimer = setInterval(() => { 424 currentNum += Math.floor(Math.random() * 3); 425 if (currentNum >= 50) { 426 progress('database opened', 40); 427 //@ts-ignore 428 clearInterval(this.cutDownTimer); 429 } else { 430 progress('database opened', currentNum); 431 } 432 }, Math.floor(Math.random() * 2500 + 1000)); 433 } 434} 435 436export const threadPool = new DbPool('1'); 437export const threadPool2 = new DbPool('2'); 438 439export interface ThreadPoolConfig { 440 action?: string | null, 441 traceId?: string | null | undefined 442} 443 444export function getThreadPool(traceId?: string | null): DbPool { 445 return traceId === '2' ? threadPool2 : threadPool; 446} 447 448export function query<T>( 449 name: string, 450 sql: string, 451 args: unknown = null, 452 config?: ThreadPoolConfig 453): Promise<Array<T>> { 454 return new Promise<Array<T>>((resolve, reject): void => { 455 getThreadPool(config?.traceId).submit( 456 name, 457 sql, 458 args, 459 (res: Array<T>) => { 460 if (res[0] && res[0] === 'error') { 461 window.publish(window.SmartEvent.UI.Error, res[1]); 462 reject(res); 463 } else { 464 resolve(res); 465 } 466 }, 467 config ? (config.action || null) : null 468 ); 469 }); 470} 471 472export function setThreadPoolTraceBuffer(traceId: string, buf: ArrayBuffer | null): void { 473 if (traceId === threadPool2.traceId) { 474 threadPool2.sharedBuffer = buf; 475 } else { 476 threadPool.sharedBuffer = buf; 477 } 478} 479 480export function getThreadPoolTraceBuffer(traceId: string): ArrayBuffer | null { 481 return traceId === threadPool2.traceId ? threadPool2.sharedBuffer : threadPool.sharedBuffer; 482} 483 484export function setThreadPoolTraceBufferCacheKey(traceId: string, key: string): void { 485 if (traceId === threadPool2.traceId) { 486 threadPool2.fileCacheKey = key; 487 } else { 488 threadPool.fileCacheKey = key; 489 } 490} 491 492export function getThreadPoolTraceBufferCacheKey(traceId: string): string { 493 return traceId === threadPool2.traceId ? threadPool2.fileCacheKey : threadPool.fileCacheKey; 494} 495