image_seg_client.py 5.1 KB
Newer Older
W
wuzewu 已提交
1
# coding: utf-8
Z
zhoushunjie 已提交
2
import os
W
wuzewu 已提交
3 4 5 6 7 8 9
import cv2
import requests
import json
import base64
import numpy as np
import time
import threading
Z
zhoushunjie 已提交
10
import re
W
wuzewu 已提交
11 12

#分割服务的地址
Z
zhoushunjie 已提交
13
IMAGE_SEG_URL = 'http://xxx.xxx.xxx.xxx:8010/ImageSegService/inference'
W
wuzewu 已提交
14 15

class ClientThread(threading.Thread):
Z
zhoushunjie 已提交
16
    def __init__(self, thread_id, image_data_repo):
W
wuzewu 已提交
17 18
        threading.Thread.__init__(self)
        self.__thread_id = thread_id
Z
zhoushunjie 已提交
19
        self.__image_data_repo = image_data_repo
W
wuzewu 已提交
20 21

    def run(self):
Z
zhoushunjie 已提交
22
        self.__request_image_seg_service()
W
wuzewu 已提交
23

Z
zhoushunjie 已提交
24
    def __request_image_seg_service(self):
W
wuzewu 已提交
25
        # 持续发送150个请求
Z
zhoushunjie 已提交
26
        for i in range(1, 151):
W
wuzewu 已提交
27
            print("Epoch %d, thread %d" % (i, self.__thread_id))
Z
zhoushunjie 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
            self.__benchmark_test()

    # benchmark test
    def __benchmark_test(self):
        start = time.time()
        for image_filename in self.__image_data_repo:
            mask_mat_list = self.__request_predictor_server(image_filename)
            input_img = self.__image_data_repo.get_image_matrix(image_filename)
            # 将获得的mask matrix转换成可视化图像,并在当前目录下保存为图像文件
            # 如果进行压测,可以把这句话注释掉
            for j in range(len(mask_mat_list)):
                self.__visualization(mask_mat_list[j], image_filename, 2, input_img)
        latency = time.time() - start
        print("total latency = %f s" % (latency))

    # 对预测结果进行可视化
    # input_raw_mask 是server返回的预测结果
    # output_img 是可视化结果存储路径
    def __visualization(self, mask_mat, output_img, num_cls, input_img):
        # ColorMap for visualization more clearly
        n = num_cls
        color_map = []
        for j in range(n):
            lab = j
            a = b = c = 0
            color_map.append([a, b, c])
            i = 0
            while lab:
                color_map[j][0] |= (((lab >> 0) & 1) << (7 - i))
                color_map[j][1] |= (((lab >> 1) & 1) << (7 - i))
                color_map[j][2] |= (((lab >> 2) & 1) << (7 - i))
                i += 1
                lab >>= 3
        im = cv2.imdecode(mask_mat, 1)
        w, h, c = im.shape
        im2 = cv2.resize(im, (w, h))
        im = im2
        # I = aF + (1-a)B
        a = im / 255.0
        im = a * input_img + (1 - a) * [255, 255, 255]
        cv2.imwrite(output_img, im)

    def __request_predictor_server(self, input_img):
        data = {"instances": [self.__get_item_json(input_img)]}
        response = requests.post(IMAGE_SEG_URL, data=json.dumps(data))
        try:
            response = json.loads(response.text)
            prediction_list = response["prediction"]
            mask_response_list = [mask_response["info"] for mask_response in prediction_list]
            mask_raw_list = [json.loads(mask_response)["mask"] for mask_response in mask_response_list]
        except Exception as err:
            print("Exception[%s], server_message[%s]" % (str(err), response.text))
            return None
        # 使用 json 协议回复的包也是 base64 编码过的
        mask_binary_list = [base64.b64decode(mask_raw) for mask_raw in mask_raw_list]
        m = [np.fromstring(mask_binary, np.uint8) for mask_binary in mask_binary_list]
        return m

    # 请求预测服务
    # input_img 要预测的图片列表
    def __get_item_json(self, input_img):
        # 使用 http 协议请求服务时, 请使用 base64 编码发送图片
        item_binary_b64 = str(base64.b64encode(self.__image_data_repo.get_image_binary(input_img)), 'utf-8')
        item_size = len(item_binary_b64)
        item_json = {
            "image_length": item_size,
            "image_binary": item_binary_b64
        }
        return item_json
W
wuzewu 已提交
97

Z
zhoushunjie 已提交
98 99
def create_thread_pool(thread_num, image_data_repo):
    return [ClientThread(i + 1, image_data_repo) for i in range(thread_num)]
W
wuzewu 已提交
100 101 102 103 104 105 106 107 108


def run_threads(thread_pool):
    for thread in thread_pool:
        thread.start()

    for thread in thread_pool:
        thread.join()

Z
zhoushunjie 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
class ImageDataRepo:
    def __init__(self, dir_name):
        print("Loading images data...")
        self.__data = {}
        pattern = re.compile(".+\.(jpg|jpeg)", re.I)
        if os.path.isdir(dir_name):
            for image_filename in os.listdir(dir_name):
                if pattern.match(image_filename):
                    full_path = os.path.join(dir_name, image_filename)
                    fp = open(full_path, mode="rb")
                    image_binary_data = fp.read()
                    image_mat_data = cv2.imread(full_path)
                    self.__data[image_filename] = (image_binary_data, image_mat_data)
        else:
            raise Exception("Please use directory to initialize");
        print("Finish loading.")
    def __iter__(self):
        for filename in self.__data:
            yield filename

    def get_image_binary(self, image_name):
        return self.__data[image_name][0]

    def get_image_matrix(self, image_name):
        return self.__data[image_name][1]


W
wuzewu 已提交
136
if __name__ == "__main__":
Z
zhoushunjie 已提交
137 138 139
    #preprocess
    IDR = ImageDataRepo("images")
    thread_pool = create_thread_pool(thread_num=1, image_data_repo=IDR)
W
wuzewu 已提交
140
    run_threads(thread_pool)