diff --git a/python/examples/pipeline/ocr/benchmark.py b/python/examples/pipeline/ocr/benchmark.py index fbe0397e046645bbb3251a9f9a404acdc57da34b..337abca8745029d9b790c463d7cfb2956190ef12 100644 --- a/python/examples/pipeline/ocr/benchmark.py +++ b/python/examples/pipeline/ocr/benchmark.py @@ -68,19 +68,40 @@ def run_http(idx, batch_size): latency_list = [] total_number = 0 for img_file in os.listdir(test_img_dir): + l_start = time.time() with open(os.path.join(test_img_dir, img_file), 'rb') as file: image_data1 = file.read() image = cv2_to_base64(image_data1) data = {"key": ["image"], "value": [image]} - for i in range(100): - r = requests.post(url=url, data=json.dumps(data)) + #for i in range(100): + r = requests.post(url=url, data=json.dumps(data)) + print(r.json()) end = time.time() - return [[end - start]] + l_end = time.time() + latency_list.append(l_end * 1000 - l_start * 1000) + total_number = total_number + 1 + return [[end - start], latency_list, [total_number]] def multithread_http(thread, batch_size): multi_thread_runner = MultiThreadRunner() result = multi_thread_runner.run(run_http, thread, batch_size) + start = time.time() + result = multi_thread_runner.run(run_http, thread, batch_size) + end = time.time() + total_cost = end - start + avg_cost = 0 + total_number = 0 + for i in range(thread): + avg_cost += result[0][i] + total_number += result[2][i] + avg_cost = avg_cost / thread + print("Total cost: {}s".format(total_cost)) + print("Each thread cost: {}s. ".format(avg_cost)) + print("Total count: {}. ".format(total_number)) + print("AVG QPS: {} samples/s".format(batch_size * total_number / + total_cost)) + show_latency(result[1]) def run_rpc(thread, batch_size): @@ -92,19 +113,38 @@ def run_rpc(thread, batch_size): latency_list = [] total_number = 0 for img_file in os.listdir(test_img_dir): + l_start = time.time() with open(os.path.join(test_img_dir, img_file), 'rb') as file: image_data = file.read() image = cv2_to_base64(image_data) - - for i in range(100): - ret = client.predict(feed_dict={"image": image}, fetch=["res"]) + ret = client.predict(feed_dict={"image": image}, fetch=["res"]) + print(ret) + l_end = time.time() + latency_list.append(l_end * 1000 - l_start * 1000) + total_number = total_number + 1 end = time.time() - return [[end - start]] + return [[end - start], latency_list, [total_number]] def multithread_rpc(thraed, batch_size): multi_thread_runner = MultiThreadRunner() result = multi_thread_runner.run(run_rpc, thread, batch_size) + start = time.time() + result = multi_thread_runner.run(run_rpc, thread, batch_size) + end = time.time() + total_cost = end - start + avg_cost = 0 + total_number = 0 + for i in range(thread): + avg_cost += result[0][i] + total_number += result[2][i] + avg_cost = avg_cost / thread + print("Total cost: {}s".format(total_cost)) + print("Each thread cost: {}s. ".format(avg_cost)) + print("Total count: {}. ".format(total_number)) + print("AVG QPS: {} samples/s".format(batch_size * total_number / + total_cost)) + show_latency(result[1]) if __name__ == "__main__": diff --git a/python/examples/pipeline/ocr/benchmark.sh b/python/examples/pipeline/ocr/benchmark.sh index d789b94b98d03c87bfcc1ee7520e13336abdca2f..bf9ac2b0473aa84ce3be739377251c3a5ee1fa53 100644 --- a/python/examples/pipeline/ocr/benchmark.sh +++ b/python/examples/pipeline/ocr/benchmark.sh @@ -1,59 +1,88 @@ export FLAGS_profile_pipeline=1 -alias python3="python3.7" +alias python3="python3.6" modelname="ocr" + # HTTP -ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 +#ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 sleep 3 -python3 benchmark.py yaml local_predictor 1 gpu +# Create yaml,If you already have the config.yaml, ignore it. +#python3 benchmark.py yaml local_predictor 1 gpu rm -rf profile_log_$modelname -for thread_num in 1 8 16 + +echo "Starting HTTP Clients..." +# Start a client in each thread, tesing the case of multiple threads. +for thread_num in 1 2 4 8 12 16 do for batch_size in 1 do - echo "----Bert thread num: $thread_num batch size: $batch_size mode:http ----" >>profile_log_$modelname - rm -rf PipelineServingLogs - rm -rf cpu_utilization.py - python3 web_service.py >web.log 2>&1 & - sleep 3 - nvidia-smi --id=2 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 & - nvidia-smi --id=2 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 & + echo '----$modelname thread num: $thread_num batch size: $batch_size mode:http ----' >>profile_log_$modelname + # Start one web service, If you start the service yourself, you can ignore it here. + #python3 web_service.py >web.log 2>&1 & + #sleep 3 + + # --id is the serial number of the GPU card, Must be the same as the gpu id used by the server. + nvidia-smi --id=3 --query-gpu=memory.used --format=csv -lms 1000 > gpu_use.log 2>&1 & + nvidia-smi --id=3 --query-gpu=utilization.gpu --format=csv -lms 1000 > gpu_utilization.log 2>&1 & echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py - python3 benchmark.py run http $thread_num $batch_size - python3 cpu_utilization.py >>profile_log_$modelname - ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 - python3 benchmark.py dump benchmark.log benchmark.tmp - mv benchmark.tmp benchmark.log + # Start http client + python3 benchmark.py run http $thread_num $batch_size > profile 2>&1 + + # Collect CPU metrics, Filter data that is zero momentarily, Record the maximum value of GPU memory and the average value of GPU utilization + python3 cpu_utilization.py >> profile_log_$modelname + grep -av '^0 %' gpu_utilization.log > gpu_utilization.log.tmp awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname - awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}' gpu_utilization.log >> profile_log_$modelname - cat benchmark.log >> profile_log_$modelname - #rm -rf gpu_use.log gpu_utilization.log + awk -F' ' '{sum+=$1} END {print "GPU_UTILIZATION:", sum/NR, sum, NR }' gpu_utilization.log.tmp >> profile_log_$modelname + + # Show profiles + python3 ../../util/show_profile.py profile $thread_num >> profile_log_$modelname + tail -n 8 profile >> profile_log_$modelname + echo '' >> profile_log_$modelname done done + +# Kill all nvidia-smi background task. +pkill nvidia-smi + +echo "Starting RPC Clients..." + # RPC -ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 +#ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 sleep 3 -python3 benchmark.py yaml local_predictor 1 gpu -for thread_num in 1 8 16 +# Create yaml,If you already have the config.yaml, ignore it. +#python3 benchmark.py yaml local_predictor 1 gpu +rm -rf profile_log_$modelname + +# Start a client in each thread, tesing the case of multiple threads. +for thread_num in 1 2 4 6 8 12 16 do for batch_size in 1 do - echo "----Bert thread num: $thread_num batch size: $batch_size mode:rpc ----" >>profile_log_$modelname - rm -rf PipelineServingLogs - rm -rf cpu_utilization.py - python3 web_service.py >web.log 2>&1 & - sleep 3 - nvidia-smi --id=2 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 & - nvidia-smi --id=2 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 & + echo "----$modelname thread num: $thread_num batch size: $batch_size mode:rpc ----" >> profile_log_$modelname + # Start one web service, If you start the service yourself, you can ignore it here. + #python3 web_service.py >web.log 2>&1 & + #sleep 3 + + # --id is the serial number of the GPU card, Must be the same as the gpu id used by the server. + nvidia-smi --id=3 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 & + nvidia-smi --id=3 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 & echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py - python3 benchmark.py run rpc $thread_num $batch_size - python3 cpu_utilization.py >>profile_log_$modelname - ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 - python3 benchmark.py dump benchmark.log benchmark.tmp - mv benchmark.tmp benchmark.log + + # Start http client + python3 benchmark.py run rpc $thread_num $batch_size > profile 2>&1 + + # Collect CPU metrics, Filter data that is zero momentarily, Record the maximum value of GPU memory and the average value of GPU utilization + python3 cpu_utilization.py >> profile_log_$modelname + grep -av '^0 %' gpu_utilization.log > gpu_utilization.log.tmp awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname - awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}' gpu_utilization.log >> profile_log_$modelname - #rm -rf gpu_use.log gpu_utilization.log - cat benchmark.log >> profile_log_$modelname + awk -F" " '{sum+=$1} END {print "GPU_UTILIZATION:", sum/NR, sum, NR }' gpu_utilization.log.tmp >> profile_log_$modelname + + # Show profiles + python3 ../../util/show_profile.py profile $thread_num >> profile_log_$modelname + tail -n 8 profile >> profile_log_$modelname + echo "" >> profile_log_$modelname done done + +# Kill all nvidia-smi background task. +pkill nvidia-smi diff --git a/python/examples/pipeline/ocr/web_service.py b/python/examples/pipeline/ocr/web_service.py index 554d161fe30cd573a9c0a4a5a4b55402952cc1fa..e91c813c455174224ff014c7cb4c288f16924fca 100644 --- a/python/examples/pipeline/ocr/web_service.py +++ b/python/examples/pipeline/ocr/web_service.py @@ -45,6 +45,7 @@ class DetOp(Op): imgs = [] for key in input_dict.keys(): data = base64.b64decode(input_dict[key].encode('utf8')) + self.raw_im = data data = np.frombuffer(data, np.uint8) self.im = cv2.imdecode(data, cv2.IMREAD_COLOR) self.ori_h, self.ori_w, _ = self.im.shape @@ -61,7 +62,7 @@ class DetOp(Op): ] dt_boxes_list = self.post_func(det_out, [ratio_list]) dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w]) - out_dict = {"dt_boxes": dt_boxes, "image": self.im} + out_dict = {"dt_boxes": dt_boxes, "image": self.raw_im} return out_dict, None, "" @@ -73,7 +74,9 @@ class RecOp(Op): def preprocess(self, input_dicts, data_id, log_id): (_, input_dict), = input_dicts.items() - im = input_dict["image"] + raw_im = input_dict["image"] + data = np.frombuffer(raw_im, np.uint8) + im = cv2.imdecode(data, cv2.IMREAD_COLOR) dt_boxes = input_dict["dt_boxes"] dt_boxes = self.sorted_boxes(dt_boxes) feed_list = [] @@ -99,7 +102,7 @@ class RecOp(Op): """ ## Many mini-batchs, the type of feed_data is list. - max_batch_size = 6 # len(dt_boxes) + max_batch_size = len(dt_boxes) # If max_batch_size is 0, skipping predict stage if max_batch_size == 0: diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index 14c4a25ccdeff590a75947ffac85dc33b4cb419b..f1851d6281f4422848eb81ef3224ef2f93ccc01c 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -122,6 +122,17 @@ class ChannelData(object): self.client_need_profile = client_need_profile self.profile_data_set = set() + def get_size(self): + size = 0 + dict_data = None + if isinstance(self.dictdata, dict): + for k in self.dictdata: + size += sys.getsizeof(self.dictdata[k]) + sys.getsizeof(k) + if isinstance(self.npdata, dict): + for k in self.npdata: + size += sys.getsizeof(self.npdata[k]) + sys.getsizeof(k) + return size + def add_profile(self, profile_set): if self.client_need_profile is False: self.client_need_profile = True @@ -213,10 +224,10 @@ class ChannelData(object): else: return 1 - def __str__(self): - return "type[{}], error_code[{}], data_id[{}], log_id[{}], dict_data[{}]".format( + def get_all_data(self): + return "type[{}], error_code[{}], data_id[{}], log_id[{}], dict_size[{}]".format( ChannelDataType(self.datatype).name, self.error_code, self.id, - self.log_id, str(self.dictdata)) + self.log_id, self.get_size()) class ProcessChannel(object): @@ -313,8 +324,10 @@ class ProcessChannel(object): def push(self, channeldata, op_name=None): _LOGGER.debug( - self._log("(data_id={} log_id={}) Op({}) Enter channel::push". - format(channeldata.id, channeldata.log_id, op_name))) + self._log( + "(data_id={} log_id={}) Op({}) Enter channel::push producers:{}". + format(channeldata.id, channeldata.log_id, op_name, + len(self._producers)))) if len(self._producers) == 0: _LOGGER.critical( self._log( @@ -323,19 +336,30 @@ class ProcessChannel(object): format(channeldata.id, channeldata.log_id, op_name))) os._exit(-1) elif len(self._producers) == 1: + start_time = _time() with self._cv: + enter_cv_time = _time() + push_que_time = enter_cv_time while self._stop.value == 0: try: self._que.put((channeldata.id, { op_name: channeldata }), timeout=0) + push_que_time = _time() break except Queue.Full: self._cv.wait() if self._stop.value == 1: raise ChannelStopError() self._cv.notify_all() + notify_all_time = _time() + _LOGGER.debug( + "(data_id={}) Op({}) channel push cost! enter_cv:{} ms, push_que:{} ms, notify:{} ms, data_size:{}". + format(channeldata.id, op_name, (enter_cv_time - start_time) + * 1000, (push_que_time - enter_cv_time) * 1000, ( + notify_all_time - push_que_time) * 1000, + channeldata.get_size())) _LOGGER.debug( self._log( "(data_id={} log_id={}) Op({}) Pushed data into internal queue.". @@ -414,10 +438,15 @@ class ProcessChannel(object): os._exit(-1) elif len(self._consumer_cursors) == 1: resp = None + time_1 = int(round(_time() * 1000000)) + time_2 = time_1 + time_3 = time_2 with self._cv: + time_2 = int(round(_time() * 1000000)) while self._stop.value == 0 and resp is None: try: resp = self._que.get(timeout=0)[1] + time_3 = int(round(_time() * 1000000)) break except Queue.Empty: if timeout is not None: @@ -432,7 +461,12 @@ class ProcessChannel(object): self._cv.wait() if self._stop.value == 1: raise ChannelStopError() - + key = list(resp.keys())[0] + data_id = resp[key].id + _LOGGER.debug( + "(data_id={}) op({}) front cost enter_cv:{} ms, queue_get:{} ms". + format(data_id, op_name, (time_2 - time_1) / 1000.0, ( + time_3 - time_2) / 1000.0)) if resp is not None: list_values = list(resp.values()) _LOGGER.debug( @@ -485,6 +519,7 @@ class ProcessChannel(object): if self._stop.value == 1: raise ChannelStopError() + time_1 = int(round(_time() * 1000000)) consumer_cursor = self._consumer_cursors[op_name] base_cursor = self._base_cursor.value data_idx = consumer_cursor - base_cursor @@ -519,6 +554,8 @@ class ProcessChannel(object): self._cursor_count[new_consumer_cursor] += 1 self._cv.notify_all() + time_2 = int(round(_time() * 1000000)) + #_LOGGER.warning("self._cv logic cost:{}".format(time2 - time1)) if resp is not None: list_values = list(resp.values()) diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index b2e861d2ba3f99647fdea17066834044bd18973b..6eb57a328f77ca2e7346fd5d73e0267582577f02 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -123,7 +123,7 @@ class Op(object): if self._auto_batching_timeout is None: self._auto_batching_timeout = conf["auto_batching_timeout"] if self._auto_batching_timeout <= 0 or self._batch_size == 1: - _LOGGER.warning( + _LOGGER.debug( self._log( "Because auto_batching_timeout <= 0 or batch_size == 1," " set auto_batching_timeout to None.")) @@ -1005,6 +1005,7 @@ class Op(object): for idx in range(batch_size): try: channeldata_dict = None + front_start_time = int(round(_time() * 1000000)) if timeout is not None: remaining = endtime - _time() if remaining <= 0.0: @@ -1017,8 +1018,8 @@ class Op(object): channeldata_dict = input_channel.front(op_name) batch.append(channeldata_dict) _LOGGER.debug( - "_auto_batching_generator get {} channeldata from op:{} into batch, batch_size:{}". - format(idx, op_name, batch_size)) + "_auto_batching_generator get {} channeldata from op:{} input channel. time={}". + format(idx, op_name, front_start_time)) except ChannelTimeoutError: _LOGGER.debug("{} Failed to generate batch: " "timeout".format(op_info_prefix)) @@ -1152,6 +1153,13 @@ class Op(object): # data in the whole batch is all error data continue + # print + front_cost = int(round(_time() * 1000000)) - start + for data_id, parsed_data in parsed_data_dict.items(): + _LOGGER.debug( + "(data_id={}) POP INPUT CHANNEL! op:{}, cost:{} ms".format( + data_id, self.name, front_cost / 1000.0)) + # preprecess start = profiler.record("prep#{}_0".format(op_info_prefix)) preped_data_dict, err_channeldata_dict, skip_process_dict \ @@ -1199,6 +1207,7 @@ class Op(object): = self._run_postprocess(parsed_data_dict, midped_data_dict, op_info_prefix, logid_dict) end = profiler.record("postp#{}_1".format(op_info_prefix)) postp_time = end - start + after_postp_time = _time() try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( @@ -1212,7 +1221,6 @@ class Op(object): break if len(postped_data_dict) == 0: continue - # push data to channel (if run succ) start = int(round(_time() * 1000000)) try: @@ -1226,12 +1234,21 @@ class Op(object): profile_str=profile_str, client_need_profile=need_profile_dict[data_id], profile_set=profile_dict[data_id]) + after_outchannel_time = _time() + _LOGGER.debug( + "(data_id={}) PUSH OUTPUT CHANNEL! op:{} push cost:{} ms". + format(data_id, self.name, (after_outchannel_time - + after_postp_time) * 1000)) + _LOGGER.debug( + "(data_id={}) PUSH OUTPUT CHANNEL! op:{} push data:{}". + format(data_id, self.name, postped_data.get_all_data())) except ChannelStopError: _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break end = int(round(_time() * 1000000)) out_time = end - start + after_outchannel_time = int(round(_time() * 1000000)) if trace_buffer is not None: trace_que.append({ "name": self.name, @@ -1345,14 +1362,7 @@ class RequestOp(Op): raise ValueError("request is None") for idx, key in enumerate(request.key): - data = request.value[idx] - try: - evaled_data = eval(data) - if isinstance(evaled_data, np.ndarray): - data = evaled_data - except Exception as e: - pass - dict_data[key] = data + dict_data[key] = request.value[idx] log_id = request.logid _LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \ name:{}, method:{}".format(log_id, request.clientip, request.name,