PROCESS_DATA.md 8.5 KB
Newer Older
J
Jiawei Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
## Paddle Serving 数据处理

### 综述

Paddle Serving提供了非常灵活的pipeline web/rpc服务,因此需要一个统一的教程来指导在数据流的各个阶段,我们的自然数据(文字/图片/稀疏参数表)会以何种形式存在并且传递。本文将以pipeline web service为例。

### pipeline客户端

pipeline客户端只做很简单的处理,他们把自然输入转化成可以序列化的JSON字典或者是对应的protubuf bytes字段即可。

#### 1)字符串/数字

字符串和数字在这个阶段都以字符串的形式存在。我们以[房价预测](../python/examples/pipeline/simple_web_service)作为例子。房价预测的输入是13个维度的浮点数去描述一个住房的特征。在客户端阶段就可以直接如下所示

```
curl -X POST -k http://localhost:18082/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
```

我们直接把13个数字当成一整个字符串,中间用逗号`,` 隔开。在这里 key所跟随的列表长度需要和 value所跟随的列表长度相等。

同理,如果是字符串文字输入,在这个阶段不妨直接明文输入,例如Bert在这个阶段不妨可以直接写成

```
curl -X POST -k http://localhost:18082/bert/prediction -d '{"key": ["x"], "value": ["hello world"]}'
```

当然,复杂的处理也可以把这个curl转换成python语言,详情参见[Bert Pipeline示例](../python/examples/pipeline/bert). 

#### 2)图片

图片在Paddle的输入通常需要转换成numpy array,但是在客户端阶段,不需要转换成numpy array,因为那样比较耗费空间,在这个阶段我们用base64 string来传输就可以了,到了服务端的前处理再去解读base64转换成numpy array。详情参见[图像分类pipeline示例](../python/examples/pipeline/PaddleClas/DarkNet53/pipeline_http_client.py),我们也贴出部分代码

```python
def cv2_to_base64(image):
    return base64.b64encode(image).decode('utf8')
if __name__ == "__main__":
    url = "http://127.0.0.1:18080/imagenet/prediction"
    with open(os.path.join(".", "daisy.jpg"), 'rb') as file:
        image_data1 = file.read()
    image = cv2_to_base64(image_data1)
    data = {"key": ["image"], "value": [image]}
    for i in range(100):
        r = requests.post(url=url, data=json.dumps(data))
        print(r.json())
```

可以看出经过这样的操作,图片就可以像string一样,成为JSON或者GRPC Protobuf请求的一部分,发送到了服务端。

## pipeline服务端前处理

这些数据到了服务端之后,由于有一个auto batch的阶段,所以服务端程序接受到的是一个列表的python dict,列表里面的每一个dict,对应着我们从客户端发出去的请求。

#### 1)字符串/数字

刚才提到的房价预测示例,[服务端程序](../python/examples/pipeline/simple_web_service/web_service.py)在这里。

```python
    def init_op(self):
        self.separator = ","
        self.batch_separator = ";"

    def preprocess(self, input_dicts, data_id, log_id):
        (_, input_dict), = input_dicts.items() 
        _LOGGER.error("UciOp::preprocess >>> log_id:{}, input:{}".format(
            log_id, input_dict))
        x_value = input_dict["x"].split(self.batch_separator)
        x_lst = []
        for x_val in x_value:
            x_lst.append(
                np.array([
                    float(x.strip()) for x in x_val.split(self.separator)
                ]).reshape(1, 13))
        input_dict["x"] = np.concatenate(x_lst, axis=0)
        proc_dict = {}
        return input_dict, False, None, ""

```

可以看到我们在接收到客户端的请求(请求字典如下)

```json
{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}
```

之后,服务端对字符串的逗号`,`做了分隔。变成了 numpy array,并且shape是[1, 13]。最终需要确保 return的input_dict就是 能够和Paddle Predictor直接做交互的字典。

对于bert服务由于发送的已经是明文,服务端处理程序

```python
    def init_op(self):
        self.reader = ChineseBertReader({
            "vocab_file": "vocab.txt",
            "max_seq_len": 128
        })

    def preprocess(self, input_dicts, data_id, log_id):
        (_, input_dict), = input_dicts.items()
        print("input dict", input_dict)
        batch_size = len(input_dict.keys())
        feed_res = []
        for i in range(batch_size):
            feed_dict = self.reader.process(input_dict[str(i)].encode("utf-8"))
            for key in feed_dict.keys():
                feed_dict[key] = np.array(feed_dict[key]).reshape(
                    (1, len(feed_dict[key]), 1))
            feed_res.append(feed_dict)
        feed_dict = {}
        for key in feed_res[0].keys():
            feed_dict[key] = np.concatenate([x[key] for x in feed_res], axis=0)
            print(key, feed_dict[key].shape)
        return feed_dict, False, None, ""
```

就是由一个bert字典,来处理输入的明文数据,每一句话都生成 与bert seq len长度的浮点数。最终需要确保 return的input_dict就是 能够和Paddle Predictor直接做交互的字典。

#### 2)图片处理

图像的前处理阶段,前面提到的图像处理程序,[服务端程序](../python/examples/pipeline/PaddleClas/DarkNet53/resnet50_web_service.py)如下。

```python
    def init_op(self):
        self.seq = Sequential([
            Resize(256), CenterCrop(224), RGB2BGR(), Transpose((2, 0, 1)),
            Div(255), Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225],
                                True)
        ])
        self.label_dict = {}
        label_idx = 0
        with open("imagenet.label") as fin:
            for line in fin:
                self.label_dict[label_idx] = line.strip()
                label_idx += 1

    def preprocess(self, input_dicts, data_id, log_id):
        (_, input_dict), = input_dicts.items()
        batch_size = len(input_dict.keys())
        imgs = []
        for key in input_dict.keys():
            data = base64.b64decode(input_dict[key].encode('utf8'))
            data = np.fromstring(data, np.uint8)
            im = cv2.imdecode(data, cv2.IMREAD_COLOR)
            img = self.seq(im)
            imgs.append(img[np.newaxis, :].copy())
        input_imgs = np.concatenate(imgs, axis=0)
        return {"image": input_imgs}, False, None, ""
```

可以看到我们在收到请求后,先要做base64的decode,然后再做np from string 最后用opencv库imcode,才能完成图片到numpy array的转换,这个时候的数据就可以直接用于Paddle的图像前处理。

我们最后再经过Sequential的 Resize(调整大小),CenterCrop(中央部分裁剪),RGB2BGR(颜色通道转换),Transpose(转置矩阵),Normalize(归一化),最终形成和Paddle模型输入需求相一致的numpy array。

## pipeline服务端预测

预测阶段和Paddle预测一样,我们在preprocess函数给到了所需的输入,就可以不需要额外添加代码,到postprocess端等待输出即可。

## pipeline服务端后处理

后处理阶段函数原型是`def postprocess(self, input_dicts, fetch_dict, log_id):`

我们会获取Paddle预测返回的fetch dict,后处理通常需要这个字典信息。

后处理的方式多种多样,例如前面的房价预测就不要后处理,预测的结果就已经给出了对房价的预测。

图像分类需要做后处理,代码如下

```python
def postprocess(self, input_dicts, fetch_dict, log_id):
        score_list = fetch_dict["prediction"]
        result = {"label": [], "prob": []}
        for score in score_list:
            score = score.tolist()
            max_score = max(score)
            result["label"].append(self.label_dict[score.index(max_score)]
                                   .strip().replace(",", ""))
            result["prob"].append(max_score)
        result["label"] = str(result["label"])
        result["prob"] = str(result["prob"])
        return result, None, ""
```

我们可以看到输出的字典只有 `prediction`的矩阵,只有通过后处理,才能得到这幅图模型判定的label(物体种类),和prob(对该物体的可信度)。

如果是数字和字符串信息,确保return的result可被JSON序列化即可。

通常后处理返回不再需要传输图片,如果需要传输图片,一样需要处理成base64的样子,交给客户端。