1# CPU Intensive Task Development (TaskPool and Worker) 2 3 4CPU intensive tasks are tasks that occupy a significant amount of system computing resources and that may block other tasks in the same thread. Example CPU intensive tasks are image processing, video encoding, and data analysis. 5 6 7To improve CPU utilization and application response speeds, use multithread concurrency in processing CPU intensive tasks. 8 9 10If a task can be completed in a background thread within 3 minutes, you are advised to use TaskPool. Otherwise, use Worker. The following uses histogram processing and a time-consuming model prediction task in the background as examples. 11 12 13## Using TaskPool to Process Histograms 14 151. Implement the logic of image processing. 16 172. Segment the data, and initiate associated task scheduling through task groups. 18 19 Create a [task group](../reference/apis-arkts/js-apis-taskpool.md#taskgroup10), call [addTask()](../reference/apis-arkts/js-apis-taskpool.md#addtask10) to add tasks, and call [execute()](../reference/apis-arkts/js-apis-taskpool.md#taskpoolexecute10) to execute the tasks in the task group at a [a high priority](../reference/apis-arkts/js-apis-taskpool.md#priority). After all the tasks in the task group are complete, the histogram processing result is returned simultaneously. 20 213. Summarize and process the result arrays. 22 23```ts 24import { taskpool } from '@kit.ArkTS'; 25 26@Concurrent 27function imageProcessing(dataSlice: ArrayBuffer): ArrayBuffer { 28 // Step 1: Perform specific image processing operations and other time-consuming operations. 29 return dataSlice; 30} 31 32function histogramStatistic(pixelBuffer: ArrayBuffer): void { 33 // Step 2: Perform concurrent scheduling for data in three segments. 34 let number: number = pixelBuffer.byteLength / 3; 35 let buffer1: ArrayBuffer = pixelBuffer.slice(0, number); 36 let buffer2: ArrayBuffer = pixelBuffer.slice(number, number * 2); 37 let buffer3: ArrayBuffer = pixelBuffer.slice(number * 2); 38 39 let group: taskpool.TaskGroup = new taskpool.TaskGroup(); 40 group.addTask(imageProcessing, buffer1); 41 group.addTask(imageProcessing, buffer2); 42 group.addTask(imageProcessing, buffer3); 43 44 taskpool.execute(group, taskpool.Priority.HIGH).then((ret: Object) => { 45 // Step 3: Summarize and process the result arrays. 46 }) 47} 48 49@Entry 50@Component 51struct Index { 52 @State message: string = 'Hello World' 53 54 build() { 55 Row() { 56 Column() { 57 Text(this.message) 58 .fontSize(50) 59 .fontWeight(FontWeight.Bold) 60 .onClick(() => { 61 let buffer: ArrayBuffer = new ArrayBuffer(24); 62 histogramStatistic(buffer); 63 }) 64 } 65 .width('100%') 66 } 67 .height('100%') 68 } 69} 70``` 71 72 73## Using Worker for Time-Consuming Model Prediction 74 75The following uses the training of a region-specific house price prediction model as an example. This model can be used to predict house prices in the region based on the house area and number of rooms. The model needs to run for a long time, and each prediction needs to use the previous running result. Due to these considerations, Worker is used for the development. 76 771. In DevEco Studio, add a worker named **MyWorker** to your project. 78 79  80 812. In the main thread, call [constructor()](../reference/apis-arkts/js-apis-worker.md#constructor9) of **ThreadWorker** to create a **Worker** object. The calling thread is the host thread. 82 83 ```ts 84 // Index.ets 85 import { worker } from '@kit.ArkTS'; 86 87 const workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts'); 88 ``` 89 903. In the host thread, call [onmessage()](../reference/apis-arkts/js-apis-worker.md#onmessage9) to receive messages from the worker thread, and call [postMessage()](../reference/apis-arkts/js-apis-worker.md#postmessage9) to send messages to the worker thread. 91 92 For example, the host thread sends training and prediction messages to the worker thread, and receives messages sent back by the worker thread. 93 94 ```ts 95 // Index.ets 96 let done = false; 97 98 // Receive the result from the worker thread. 99 workerInstance.onmessage = (() => { 100 console.info('MyWorker.ts onmessage'); 101 if (!done) { 102 workerInstance.postMessage({ 'type': 1, 'value': 0 }); 103 done = true; 104 } 105 }) 106 107 workerInstance.onerror = (() => { 108 // Receive error information from the worker thread. 109 }) 110 111 // Send a training message to the worker thread. 112 workerInstance.postMessage({ 'type': 0 }); 113 ``` 114 1154. Bind the **Worker** object in the **MyWorker.ts** file. The calling thread is the worker thread. 116 117 ```ts 118 // MyWorker.ts 119 import { worker, ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@kit.ArkTS'; 120 121 let workerPort: ThreadWorkerGlobalScope = worker.workerPort; 122 ``` 123 1245. In the worker thread, call [onmessage()](../reference/apis-arkts/js-apis-worker.md#onmessage9-1) to receive messages sent by the host thread, and call [postMessage()](../reference/apis-arkts/js-apis-worker.md#postmessage9-2) to send messages to the host thread. 125 126 For example, the prediction model and its training process are defined in the worker thread, and messages are exchanged with the main thread. 127 128 ```ts 129 // MyWorker.ts 130 // Define the training model and result. 131 let result: Array<number>; 132 // Define the prediction function. 133 function predict(x: number): number { 134 return result[x]; 135 } 136 // Define the optimizer training process. 137 function optimize(): void { 138 result = [0]; 139 } 140 // onmessage logic of the worker thread. 141 workerPort.onmessage = (e: MessageEvents): void => { 142 // Perform operations based on the type of data to transmit. 143 switch (e.data.type as number) { 144 case 0: 145 // Perform training. 146 optimize(); 147 // Send a training success message to the main thread after training is complete. 148 workerPort.postMessage({ type: 'message', value: 'train success.' }); 149 break; 150 case 1: 151 // Execute the prediction. 152 const output: number = predict(e.data.value as number); 153 // Send the prediction result to the main thread. 154 workerPort.postMessage({ type: 'predict', value: output }); 155 break; 156 default: 157 workerPort.postMessage({ type: 'message', value: 'send message is invalid' }); 158 break; 159 } 160 } 161 ``` 162 1636. After the task is completed in the worker thread, destroy the worker thread. The worker thread can be destroyed by itself or the host thread. Then, call [onexit()](../reference/apis-arkts/js-apis-worker.md#onexit9) in the host thread to define the processing logic after the worker thread is destroyed. 164 165 ```ts 166 // After the worker thread is destroyed, execute the onexit() callback. 167 workerInstance.onexit = (): void => { 168 console.info("main thread terminate"); 169 } 170 ``` 171 172 Method 1: In the host thread, call [terminate()](../reference/apis-arkts/js-apis-worker.md#terminate9) to destroy the worker thread and stop the worker thread from receiving messages. 173 174 ```ts 175 // Destroy the worker thread. 176 workerInstance.terminate(); 177 ``` 178 179 Method 2: In the worker thread, call [close()](../reference/apis-arkts/js-apis-worker.md#close9) to destroy the worker thread and stop the worker thread from receiving messages. 180 181 ```ts 182 // Destroy the worker thread. 183 workerPort.close(); 184 ``` 185