1/*
2 * Copyright (C) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16class DbThread {
17  busy: boolean = false;
18  id: number = -1;
19  //@ts-ignore
20  taskMap: unknow = {};
21  worker?: Worker;
22  traceId: string;
23
24  constructor(worker: Worker, traceId: string) {
25    this.worker = worker;
26    this.traceId = traceId;
27  }
28
29  uuid(): string {
30    // @ts-ignore
31    return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c: unknow) =>
32      (c ^ (crypto.getRandomValues(new Uint8Array(1))[0] & (15 >> (c / 4)))).toString(16)
33    );
34  }
35
36  //@ts-ignore
37  queryFunc(name: string, sql: string, args: unknow, handler: Function, action: string | null): void {
38    this.busy = true;
39    let id = this.uuid();
40    this.taskMap[id] = handler;
41    let msg = {
42      id: id,
43      name: name,
44      action: action || 'exec',
45      sql: sql,
46      params: args,
47    };
48    this.worker?.postMessage(msg);
49  }
50
51  //@ts-ignore
52  queryProto(name: number, args: unknow, handler: Function): void {
53    this.busy = true;
54    let id = this.uuid();
55    this.taskMap[id] = handler;
56    let msg = {
57      id: id,
58      name: name,
59      action: 'exec-proto',
60      params: args,
61    };
62    this.worker?.postMessage(msg);
63  }
64
65  cutFileByRange(
66    leftTs: number,
67    rightTs: number,
68    handler: (status: boolean, msg: string, splitBuffer?: ArrayBuffer) => void
69  ): void {
70    this.busy = true;
71    let id = this.uuid();
72    //@ts-ignore
73    this.taskMap[id] = (res: unknow): void => {
74      setThreadPoolTraceBuffer(this.traceId, res.buffer);
75      if (res.cutStatus) {
76        handler(res.cutStatus, res.msg, res.cutBuffer);
77      } else {
78        handler(res.cutStatus, res.msg);
79      }
80    };
81    caches.match(getThreadPoolTraceBufferCacheKey(this.traceId)).then((resData) => {
82      if (resData) {
83        resData.arrayBuffer().then((buffer) => {
84          this.worker!.postMessage(
85            {
86              id: id,
87              action: 'cut-file',
88              leftTs: leftTs,
89              rightTs: rightTs,
90              buffer: buffer!,
91            },
92            [buffer!]
93          );
94        });
95      }
96    });
97  }
98
99  dbOpen = async (
100    parseConfig: string,
101    sdkWasmConfig?: string,
102    buffer?: ArrayBuffer
103  ): Promise<{
104    status: boolean;
105    msg: string;
106    buffer: ArrayBuffer;
107    //@ts-ignore
108    sdkConfigMap: unknow;
109    fileKey: string;
110  }> => {
111    //@ts-ignore
112    return new Promise<unknow>((resolve, reject) => {
113      let id = this.uuid();
114      //@ts-ignore
115      this.taskMap[id] = (res: unknow): unknow => {
116        if (res.init) {
117          resolve({
118            status: res.init,
119            msg: res.msg,
120            sdkConfigMap: res.configSqlMap,
121            buffer: res.buffer,
122            fileKey: res.fileKey,
123          });
124        } else {
125          resolve({ status: res.init, msg: res.msg });
126        }
127      };
128      this.worker?.postMessage(
129        {
130          id: id,
131          action: 'open',
132          parseConfig: parseConfig,
133          wasmConfig: sdkWasmConfig,
134          buffer: buffer! /*Optional. An ArrayBuffer representing an SQLite Database file*/,
135        },
136        [buffer!]
137      );
138    });
139  };
140
141  resetWASM(): void {
142    this.worker?.postMessage({
143      id: this.uuid(),
144      action: 'reset',
145    });
146  }
147}
148
149export class DbPool {
150  sharedBuffer: ArrayBuffer | null = null;
151  fileCacheKey: string = 'null';
152  traceId: string;
153  maxThreadNumber: number = 0;
154  works: Array<DbThread> = [];
155  progress: Function | undefined | null;
156  num = Math.floor(Math.random() * 10 + 1) + 20;
157  cutDownTimer: unknown | undefined;
158  currentWasmThread: DbThread | undefined = undefined;
159
160  constructor(traceId: string) {
161    this.traceId = traceId;
162  }
163
164  init = async (type: string, threadBuild: (() => DbThread) | undefined = undefined): Promise<void> => {
165    // wasm | server | sqlite
166    if (this.currentWasmThread) {
167      this.currentWasmThread.resetWASM();
168      this.currentWasmThread = undefined;
169    }
170    await this.close();
171    this.maxThreadNumber = 1;
172    for (let i = 0; i < this.maxThreadNumber; i++) {
173      let thread: DbThread | undefined;
174      if (threadBuild) {
175        thread = threadBuild();
176      } else {
177        if (type === 'wasm') {
178          thread = new DbThread(new Worker(new URL('./TraceWorker', import.meta.url)), this.traceId);
179        } else if (type === 'server') {
180          thread = new DbThread(new Worker(new URL('./SqlLiteWorker', import.meta.url)), this.traceId);
181        } else if (type === 'sqlite') {
182          thread = new DbThread(new Worker(new URL('./SqlLiteWorker', import.meta.url)), this.traceId);
183        }
184      }
185      if (thread) {
186        this.currentWasmThread = thread;
187        thread!.worker!.onerror = (err): void => {
188          console.warn(err);
189        };
190        thread!.worker!.onmessageerror = (err): void => {
191          console.warn(err);
192        };
193        this.threadPostMessage(thread);
194        thread!.id = i;
195        thread!.busy = false;
196        this.works?.push(thread!);
197      }
198    }
199  };
200  threadPostMessage(thread: DbThread): void {
201    thread!.worker!.onmessage = (event: MessageEvent): void => {
202      thread!.busy = false;
203      if (Reflect.has(thread!.taskMap, event.data.id)) {
204        if (event.data.results) {
205          let fun = thread!.taskMap[event.data.id];
206          if (fun) {
207            fun(event.data.results, event.data.len, event.data.transfer, event.data.isEmpty);
208          }
209          Reflect.deleteProperty(thread!.taskMap, event.data.id);
210        } else if (Reflect.has(event.data, 'cutStatus')) {
211          let fun = thread!.taskMap[event.data.id];
212          if (fun) {
213            fun(event.data);
214          }
215        } else if (Reflect.has(event.data, 'ready')) {
216          this.progress!('database opened', this.num + event.data.index);
217          this.progressTimer(this.num + event.data.index, this.progress!);
218          this.sharedBuffer = null;
219        } else if (Reflect.has(event.data, 'init')) {
220          if (this.cutDownTimer !== undefined) {
221            //@ts-ignore
222            clearInterval(this.cutDownTimer);
223          }
224          let fun = thread!.taskMap[event.data.id];
225          if (!event.data.init && !event.data.status) {
226            if (fun) {
227              fun(['error', event.data.msg]);
228            }
229          } else {
230            this.progress!('database ready', 40);
231            if (fun) {
232              fun(event.data);
233            }
234          }
235          Reflect.deleteProperty(thread!.taskMap, event.data.id);
236        } else {
237          let fun = thread!.taskMap[event.data.id];
238          if (fun) {
239            fun([]);
240          }
241          Reflect.deleteProperty(thread!.taskMap, event.data.id);
242        }
243      }
244    };
245  }
246
247  initServer = async (url: string, progress: Function): Promise<{ status: boolean; msg: string }> => {
248    this.progress = progress;
249    progress('database loaded', 15);
250    this.sharedBuffer = await fetch(url).then((res) => res.arrayBuffer());
251    progress('open database', 20);
252    for (let thread of this.works) {
253      let { status, msg } = await thread.dbOpen('');
254      if (!status) {
255        this.sharedBuffer = null;
256        return { status, msg };
257      }
258    }
259    return { status: true, msg: 'ok' };
260  };
261  initSqlite = async (
262    buf: ArrayBuffer,
263    parseConfig: string,
264    sdkWasmConfig: string,
265    progress: Function
266  ): Promise<
267    | {
268      status: false;
269      msg: string;
270      sdkConfigMap?: undefined;
271    }
272    | {
273      status: boolean;
274      msg: string;
275      //@ts-ignore
276      sdkConfigMap: unknow;
277    }
278  > => {
279    this.progress = progress;
280    progress('database loaded', 15);
281    this.sharedBuffer = buf;
282    progress('parse database', 20);
283    let configMap;
284    for (let thread of this.works) {
285      let { status, msg, buffer, sdkConfigMap, fileKey } = await thread.dbOpen(parseConfig, sdkWasmConfig, buf);
286      if (!status) {
287        this.sharedBuffer = null;
288        return { status, msg };
289      } else {
290        configMap = sdkConfigMap;
291        this.sharedBuffer = buffer;
292        if (fileKey !== '-1') {
293          this.fileCacheKey = fileKey;
294        } else {
295          this.fileCacheKey = `trace/${new Date().getTime()}-${buffer.byteLength}`;
296          this.saveTraceFileBuffer(this.fileCacheKey, buffer).then();
297        }
298      }
299    }
300    return { status: true, msg: 'ok', sdkConfigMap: configMap };
301  };
302
303  async saveTraceFileBuffer(key: string, buffer: ArrayBuffer): Promise<void> {
304    await this.obligateFileBufferSpace(buffer.byteLength);
305    caches.open(key).then((cache) => {
306      let headers = new Headers();
307      headers.append('Content-Length', `${buffer.byteLength}`);
308      headers.append('Content-Type', 'application/octet-stream');
309      cache
310        .put(
311          key,
312          new Response(buffer, {
313            status: 200,
314            headers: headers,
315          })
316        )
317        .then();
318    });
319  }
320
321  /**
322   * 计算预留缓存空间,如果空间不够,则删除部分缓存
323   * @param size
324   */
325  async obligateFileBufferSpace(size: number): Promise<void> {
326    let es = await navigator.storage.estimate();
327    let remainderByte = (es.quota || 0) - (es.usage || 0) - 20 * 1024 * 1024;
328    if (remainderByte < size) {
329      let keys = await caches.keys();
330      keys.sort((keyA, keyB) => {
331        if (keyA.includes('/') && keyB.includes('/')) {
332          let splitA = keyA.split('/');
333          let splitB = keyB.split('/');
334          let timeA = splitA[splitA.length - 1].split('-')[0];
335          let timeB = splitB[splitB.length - 1].split('-')[0];
336          return parseInt(timeA) - parseInt(timeB);
337        } else {
338          return 0;
339        }
340      });
341      let needSize = size - remainderByte;
342      for (let key of keys) {
343        await caches.delete(key);
344        let keySize = parseInt(key.split('-')[1]);
345        if (keySize > needSize) {
346          return;
347        } else {
348          needSize -= keySize;
349        }
350      }
351    }
352  }
353
354  close = async (): Promise<void> => {
355    //@ts-ignore
356    clearInterval(this.cutDownTimer);
357    for (let thread of this.works) {
358      thread.worker?.terminate();
359    }
360    this.works.length = 0;
361  };
362
363  async reset(): Promise<void> {
364    if (this.currentWasmThread) {
365      this.currentWasmThread.resetWASM();
366      this.currentWasmThread = undefined;
367    }
368    await this.close();
369  }
370
371  //@ts-ignore
372  submit(name: string, sql: string, args: unknow, handler: Function, action: string | null): void {
373    let noBusyThreads = this.works.filter((it) => !it.busy);
374    let thread: DbThread;
375    if (noBusyThreads.length > 0) {
376      //取第一个空闲的线程进行任务
377      thread = noBusyThreads[0];
378      thread.queryFunc(name, sql, args, handler, action);
379    } else {
380      // 随机插入一个线程中
381      thread = this.works[Math.floor(Math.random() * this.works.length)];
382      thread.queryFunc(name, sql, args, handler, action);
383    }
384  }
385
386  //@ts-ignore
387  submitProto(name: number, args: unknow, handler: Function): void {
388    let noBusyThreads = this.works.filter((it) => !it.busy);
389    let thread: DbThread;
390    if (noBusyThreads.length > 0) {
391      //取第一个空闲的线程进行任务
392      thread = noBusyThreads[0];
393      thread.queryProto(name, args, handler);
394    } else {
395      // 随机插入一个线程中
396      thread = this.works[Math.floor(Math.random() * this.works.length)];
397      if (thread) {
398        thread.queryProto(name, args, handler);
399      }
400    }
401  }
402
403  cutFile(
404    leftTs: number,
405    rightTs: number,
406    handler: (status: boolean, msg: string, splitBuffer?: ArrayBuffer) => void
407  ): void {
408    let noBusyThreads = this.works.filter((it) => !it.busy);
409    let thread: DbThread;
410    if (noBusyThreads.length > 0) {
411      thread = noBusyThreads[0];
412      thread.cutFileByRange(leftTs, rightTs, handler);
413    } else {
414      thread = this.works[Math.floor(Math.random() * this.works.length)];
415      thread.cutFileByRange(leftTs, rightTs, handler);
416    }
417  }
418
419  progressTimer(num: number, progress: Function): void {
420    let currentNum = num;
421    //@ts-ignore
422    clearInterval(this.cutDownTimer);
423    this.cutDownTimer = setInterval(() => {
424      currentNum += Math.floor(Math.random() * 3);
425      if (currentNum >= 50) {
426        progress('database opened', 40);
427        //@ts-ignore
428        clearInterval(this.cutDownTimer);
429      } else {
430        progress('database opened', currentNum);
431      }
432    }, Math.floor(Math.random() * 2500 + 1000));
433  }
434}
435
436export const threadPool = new DbPool('1');
437export const threadPool2 = new DbPool('2');
438
439export interface ThreadPoolConfig {
440  action?: string | null,
441  traceId?: string | null | undefined
442}
443
444export function getThreadPool(traceId?: string | null): DbPool {
445  return traceId === '2' ? threadPool2 : threadPool;
446}
447
448export function query<T>(
449  name: string,
450  sql: string,
451  args: unknown = null,
452  config?: ThreadPoolConfig
453): Promise<Array<T>> {
454  return new Promise<Array<T>>((resolve, reject): void => {
455    getThreadPool(config?.traceId).submit(
456      name,
457      sql,
458      args,
459      (res: Array<T>) => {
460        if (res[0] && res[0] === 'error') {
461          window.publish(window.SmartEvent.UI.Error, res[1]);
462          reject(res);
463        } else {
464          resolve(res);
465        }
466      },
467      config ? (config.action || null) : null
468    );
469  });
470}
471
472export function setThreadPoolTraceBuffer(traceId: string, buf: ArrayBuffer | null): void {
473  if (traceId === threadPool2.traceId) {
474    threadPool2.sharedBuffer = buf;
475  } else {
476    threadPool.sharedBuffer = buf;
477  }
478}
479
480export function getThreadPoolTraceBuffer(traceId: string): ArrayBuffer | null {
481  return traceId === threadPool2.traceId ? threadPool2.sharedBuffer : threadPool.sharedBuffer;
482}
483
484export function setThreadPoolTraceBufferCacheKey(traceId: string, key: string): void {
485  if (traceId === threadPool2.traceId) {
486    threadPool2.fileCacheKey = key;
487  } else {
488    threadPool.fileCacheKey = key;
489  }
490}
491
492export function getThreadPoolTraceBufferCacheKey(traceId: string): string {
493  return traceId === threadPool2.traceId ? threadPool2.fileCacheKey : threadPool.fileCacheKey;
494}
495