cpu-intensive-task-development.md 7.0 KB
Newer Older
G
ge-yafang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# CPU密集型任务开发指导


CPU密集型任务是指需要占用系统资源处理大量计算能力的任务,需要长时间运行,这段时间会阻塞线程其它事件的处理,不适宜放在主线程进行。例如图像处理、视频编码、数据分析等。


基于多线程并发机制处理CPU密集型任务可以提高CPU利用率,提升应用程序响应速度。


当任务不需要长时间(3分钟)占据后台线程,而是一个个独立的任务时,推荐使用TaskPool,反之推荐使用Worker。接下来将以图像直方图处理以及后台长时间的模型预测任务分别进行举例。


## 使用TaskPool进行图像直方图处理

1. 实现图像处理的业务逻辑。

2. 数据分段,通过任务组发起关联任务调度。
G
ge-yafang 已提交
18
   创建[TaskGroup](../reference/apis/js-apis-taskpool.md#taskgroup10)并通过[addTask()](../reference/apis/js-apis-taskpool.md#addtask10)添加对应的任务,通过[execute()](../reference/apis/js-apis-taskpool.md#taskpoolexecute10)执行任务组,并指定为[高优先级](../reference/apis/js-apis-taskpool.md#priority),在当前任务组所有任务结束后,会将直方图处理结果同时返回。
G
ge-yafang 已提交
19 20 21 22 23 24 25

3. 结果数组汇总处理。

```ts
import taskpool from '@ohos.taskpool';

@Concurrent
26
function imageProcessing(dataSlice: ArrayBuffer): ArrayBuffer {
G
ge-yafang 已提交
27 28 29 30
  // 步骤1: 具体的图像处理操作及其他耗时操作
  return dataSlice;
}

31
function histogramStatistic(pixelBuffer: ArrayBuffer): void {
G
ge-yafang 已提交
32
  // 步骤2: 分成三段并发调度
33 34 35 36
  let number: number = pixelBuffer.byteLength / 3;
  let buffer1: ArrayBuffer = pixelBuffer.slice(0, number);
  let buffer2: ArrayBuffer = pixelBuffer.slice(number, number * 2);
  let buffer3: ArrayBuffer = pixelBuffer.slice(number * 2);
G
ge-yafang 已提交
37

38
  let group: taskpool.TaskGroup = new taskpool.TaskGroup();
G
ge-yafang 已提交
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
  group.addTask(imageProcessing, buffer1);
  group.addTask(imageProcessing, buffer2);
  group.addTask(imageProcessing, buffer3);

  taskpool.execute(group, taskpool.Priority.HIGH).then((ret: ArrayBuffer[]) => {
    // 步骤3: 结果数组汇总处理
  })
}

@Entry
@Component
struct Index {
  @State message: string = 'Hello World'

  build() {
    Row() {
      Column() {
        Text(this.message)
          .fontSize(50)
          .fontWeight(FontWeight.Bold)
          .onClick(() => {
60 61
            let buffer: ArrayBuffer = new ArrayBuffer(24);
            histogramStatistic(buffer);
G
ge-yafang 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
          })
      }
      .width('100%')
    }
    .height('100%')
  }
}
```


## 使用Worker进行长时间数据分析

本文通过某地区提供的房价数据训练一个简易的房价预测模型,该模型支持通过输入房屋面积和房间数量去预测该区域的房价,模型需要长时间运行,房价预测需要使用前面的模型运行结果,因此需要使用Worker。

1. DevEco Studio提供了Worker创建的模板,新建一个Worker线程,例如命名为“MyWorker”。

   ![newWorker](figures/newWorker.png)

80
2. 在主线程中通过调用ThreadWorker的[constructor()](../reference/apis/js-apis-worker.md#constructor9)方法创建Worker对象,当前线程为宿主线程。
81

82 83
    ```ts
    import worker from '@ohos.worker';
84

85 86
    const workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
    ```
G
ge-yafang 已提交
87

G
ge-yafang 已提交
88
3. 在宿主线程中通过调用[onmessage()](../reference/apis/js-apis-worker.md#onmessage9)方法接收Worker线程发送过来的消息,并通过调用[postMessage()](../reference/apis/js-apis-worker.md#postmessage9)方法向Worker线程发送消息。
G
ge-yafang 已提交
89 90
   例如向Worker线程发送训练和预测的消息,同时接收Worker线程发送回来的消息。

91 92 93 94 95 96 97
    ```ts
    import worker  from '@ohos.worker';
    
    const workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
    
    // 接收Worker子线程的结果
    workerInstance.onmessage = (() => {
G
ge-yafang 已提交
98 99
     console.info('MyWorker.ts onmessage');
     // 在Worker线程中进行耗时操作
100 101 102
    })
    
    workerInstance.onerror = (() => {
G
ge-yafang 已提交
103
     // 接收Worker子线程的错误信息
104 105 106 107 108 109 110
    })
    
    // 向Worker子线程发送训练消息
    workerInstance.postMessage({ 'type': 0 });
    // 向Worker子线程发送预测消息
    workerInstance.postMessage({ 'type': 1, 'value': [90, 5] });
    ```
111

G
ge-yafang 已提交
112 113

4. 在MyWorker.ts文件中绑定Worker对象,当前线程为Worker线程。
114

115
   ```ts
G
ge-yafang 已提交
116
   import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
117

G
ge-yafang 已提交
118 119 120
   let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
   ```

G
ge-yafang 已提交
121
5. 在Worker线程中通过调用[onmessage()](../reference/apis/js-apis-worker.md#onmessage9-1)方法接收宿主线程发送的消息内容,并通过调用[postMessage()](../reference/apis/js-apis-worker.md#postmessage9-2)方法向宿主线程发送消息。
122 123 124 125 126 127 128 129 130
    例如在Worker线程中定义预测模型及其训练过程,同时与主线程进行信息交互。

    ```ts
    import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
    let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
    // 定义训练模型及结果
    let result: Array<number>;
    // 定义预测函数
    function predict(x: number): number {
G
ge-yafang 已提交
131
     return result[x];
132 133 134 135 136 137 138
    }
    // 定义优化器训练过程
    function optimize(): void {
     result = [];
    }
    // Worker线程的onmessage逻辑
    workerPort.onmessage = (e: MessageEvents): void => {
G
ge-yafang 已提交
139
     // 根据传输的数据的type选择进行操作
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
     switch (e.data.type as number) {
      case 0:
      // 进行训练
       optimize();
      // 训练之后发送主线程训练成功的消息
       workerPort.postMessage({ type: 'message', value: 'train success.' });
       break;
      case 1:
      // 执行预测
       const output: number = predict(e.data.value as number);
      // 发送主线程预测的结果
       workerPort.postMessage({ type: 'predict', value: output });
       break;
      default:
       workerPort.postMessage({ type: 'message', value: 'send message is invalid' });
       break;
G
ge-yafang 已提交
156
     }
157 158
    }
    ```
G
ge-yafang 已提交
159

160
6. 在Worker线程中完成任务之后,执行Worker线程销毁操作。销毁线程的方式主要有两种:根据需要可以在宿主线程中对Worker线程进行销毁;也可以在Worker线程中主动销毁Worker线程。
G
ge-yafang 已提交
161

162
    在宿主线程中通过调用[onexit()](../reference/apis/js-apis-worker.md#onexit9)方法定义Worker线程销毁后的处理逻辑。
163

164 165 166
    ```ts
    // Worker线程销毁后,执行onexit回调方法
    workerInstance.onexit = (): void => {
G
ge-yafang 已提交
167
     console.info("main thread terminate");
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
    }
    ```

    方式一:在宿主线程中通过调用[terminate()](../reference/apis/js-apis-worker.md#terminate9)方法销毁Worker线程,并终止Worker接收息。
    
    ```ts
    // 销毁Worker线程
    workerInstance.terminate();
    ```
    
    方式二:在Worker线程中通过调用[close()](../reference/apis/js-apis-worker.md#close9)方法主动销毁Worker线程,并终止Worker接收消息。
    
    ```ts
    // 销毁线程
    workerPort.close();
    ```