1/* 2 * Copyright (C) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16import { query } from './SqlLite'; 17 18class ProcedureThread { 19 busy: boolean = false; 20 isCancelled: boolean = false; 21 id: number = -1; //@ts-ignore 22 taskMap: unknown = {}; 23 name: string | undefined; 24 worker?: Worker; 25 constructor(worker: Worker) { 26 this.worker = worker; 27 } 28 uuid(): string { 29 // @ts-ignore 30 return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c) => 31 (c ^ (crypto.getRandomValues(new Uint8Array(1))[0] & (15 >> (c / 4)))).toString(16) 32 ); 33 } 34 //@ts-ignore 35 queryFunc(type: string, args: unknown, transfer: unknown, handler: Function): void { 36 this.busy = true; 37 let id = this.uuid(); // @ts-ignore 38 this.taskMap[id] = handler; 39 let pam = { 40 id: id, 41 type: type, 42 params: args, 43 }; 44 if (transfer) { 45 try { 46 if (Array.isArray(transfer)) { 47 if (transfer.length > 0) { 48 this.worker!.postMessage(pam, [...transfer]); 49 } else { 50 this.worker!.postMessage(pam); 51 } 52 } else { 53 // @ts-ignore 54 this.worker!.postMessage(pam, [transfer]); 55 } 56 } catch ( 57 //@ts-ignore 58 e: unknown 59 ) {} 60 } else { 61 this.worker!.postMessage(pam); 62 } 63 } 64 65 cancel(): void { 66 this.isCancelled = true; 67 this.worker!.terminate(); 68 } 69} 70 71class ProcedurePool { 72 static cpuCount = Math.floor((window.navigator.hardwareConcurrency || 4) / 2); 73 maxThreadNumber: number = 1; 74 works: Array<ProcedureThread> = []; 75 timelineChange: 76 | (( 77 //@ts-ignore 78 a: unknown 79 ) => void) 80 | undefined 81 | null = null; 82 cpusLen = ProcedurePool.build('cpu', 0); 83 freqLen = ProcedurePool.build('freq', 0); 84 processLen = ProcedurePool.build('process', 0); 85 logicDataLen = ProcedurePool.build('logic', 2); 86 names = [...this.cpusLen, ...this.processLen, ...this.freqLen]; 87 logicDataHandles = [...this.logicDataLen]; 88 89 onComplete: Function | undefined; //任务完成回调 90 91 constructor(threadBuild: (() => ProcedureThread) | undefined = undefined) { 92 this.init(threadBuild); 93 } 94 95 static build(name: string, len: number): string[] { 96 return [...Array(len).keys()].map((it) => `${name}${it}`); 97 } 98 99 init(threadBuild: (() => ProcedureThread) | undefined = undefined): void { 100 this.maxThreadNumber = this.names.length; 101 for (let i = 0; i < this.maxThreadNumber; i++) { 102 this.newThread(); 103 } 104 for (let j = 0; j < this.logicDataHandles.length; j++) { 105 this.logicDataThread(); 106 } 107 } 108 109 newThread(): void { 110 // @ts-ignore 111 if (window.useWb) { 112 return; 113 } 114 let newThread: ProcedureThread = new ProcedureThread( 115 new Worker(new URL('./ui-worker/ProcedureWorker', import.meta.url), { 116 type: 'module', 117 }) 118 ); 119 newThread.name = this.names[this.works.length]; 120 newThread.worker!.onmessage = (event: MessageEvent): void => { 121 newThread.busy = false; 122 if ((event.data.type as string) === 'timeline-range-changed') { 123 this.timelineChange?.(event.data.results); 124 newThread.busy = false; 125 return; 126 } // @ts-ignore 127 if (Reflect.has(newThread.taskMap, event.data.id)) { 128 if (event.data) { 129 // @ts-ignore 130 let fun = newThread.taskMap[event.data.id]; 131 if (fun) { 132 fun(event.data.results, event.data.hover); 133 } // @ts-ignore 134 Reflect.deleteProperty(newThread.taskMap, event.data.id); 135 } 136 } 137 if (this.isIdle() && this.onComplete) { 138 this.onComplete(); 139 } 140 }; 141 newThread.worker!.onmessageerror = (e): void => {}; 142 newThread.worker!.onerror = (e): void => {}; 143 newThread.id = this.works.length; 144 newThread.busy = false; 145 this.works?.push(newThread); 146 } 147 148 private logicDataThread(): void { 149 // @ts-ignore 150 if (window.useWb) { 151 return; 152 } 153 let thread: ProcedureThread = new ProcedureThread( 154 new Worker(new URL('./logic-worker/ProcedureLogicWorker', import.meta.url), { 155 type: 'module', 156 }) 157 ); 158 thread.name = this.logicDataHandles[this.works.length - this.names.length]; 159 this.sendMessage(thread); 160 thread.worker!.onmessageerror = (e): void => {}; 161 thread.worker!.onerror = (e): void => {}; 162 thread.id = this.works.length; 163 thread.busy = false; 164 this.works?.push(thread); 165 } 166 167 private sendMessage(thread: ProcedureThread): void { 168 thread.worker!.onmessage = (event: MessageEvent): void => { 169 thread.busy = false; 170 if (event.data.isQuery) { 171 query(event.data.type, event.data.sql, event.data.args, { action : 'exec-buf' }).then( 172 ( 173 // @ts-ignore 174 res: unknown 175 ) => { 176 thread.worker!.postMessage({ 177 type: event.data.type, 178 params: { 179 list: res, 180 }, 181 id: event.data.id, 182 }); 183 } 184 ); 185 return; 186 } 187 if (event.data.isSending) { 188 if ( 189 Reflect.has( 190 // @ts-ignore 191 thread.taskMap, 192 event.data.id 193 ) 194 ) { 195 if (event.data) { 196 // @ts-ignore 197 let fun = thread.taskMap[event.data.id]; 198 if (fun) { 199 fun(event.data.results, event.data.hover); 200 } 201 return; 202 } 203 } 204 } // @ts-ignore 205 if (Reflect.has(thread.taskMap, event.data.id)) { 206 if (event.data) { 207 // @ts-ignore 208 let fun = thread.taskMap[event.data.id]; 209 if (fun) { 210 fun(event.data.results, event.data.hover); 211 } // @ts-ignore 212 Reflect.deleteProperty(thread.taskMap, event.data.id); 213 } 214 } 215 if (this.isIdle() && this.onComplete) { 216 this.onComplete(); 217 } 218 }; 219 } 220 221 close = (): void => { 222 for (let thread of this.works) { 223 thread.worker!.terminate(); 224 } 225 this.works.length = 0; 226 }; 227 228 clearCache = (): void => { 229 for (let thread of this.works) { 230 thread.queryFunc('clear', {}, undefined, () => {}); 231 } 232 }; 233 234 submitWithName(name: string, type: string, args: unknown, transfer: unknown, handler: Function): unknown { 235 let noBusyThreads = this.works.filter((it) => it.name === name); 236 let thread: ProcedureThread | undefined; 237 if (noBusyThreads.length > 0) { 238 //取第一个空闲的线程进行任务 239 thread = noBusyThreads[0]; 240 thread!.queryFunc(type, args, transfer, handler); 241 } 242 return thread; 243 } 244 // @ts-ignore 245 submitWithNamePromise(name: string, type: string, args: unknown, transfer: unknown): Promise<unknown> { 246 return new Promise((resolve, reject) => { 247 let noBusyThreads = this.works.filter((it) => it.name === name); 248 let thread: ProcedureThread | undefined; 249 if (noBusyThreads.length > 0) { 250 //取第一个空闲的线程进行任务 251 thread = noBusyThreads[0]; // @ts-ignore 252 thread!.queryFunc(type, args, transfer, (res: unknown, hover: unknown) => { 253 resolve({ 254 res: res, 255 hover: hover, 256 }); 257 }); 258 } 259 }); 260 } 261 262 isIdle(): boolean { 263 return this.works.every((it) => !it.busy); 264 } 265} 266 267export const procedurePool = new ProcedurePool(); 268