未验证 提交 0c783dc2 编写于 作者: Z Zhang Jun 提交者: GitHub

Merge branch 'develop' into fixdoc

# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import os
import base64
......@@ -12,6 +26,8 @@ except ImportError:
import numpy as np
from paddle_serving_client.utils import MultiThreadRunner
from paddle_serving_client.utils import benchmark_args, show_latency
def parse_benchmark(filein, fileout):
with open(filein, "r") as fin:
res = yaml.load(fin)
......@@ -24,6 +40,7 @@ def parse_benchmark(filein, fileout):
with open(fileout, "w") as fout:
yaml.dump(res, fout, default_flow_style=False)
def gen_yml(device):
fin = open("config.yml", "r")
config = yaml.load(fin)
......@@ -37,48 +54,96 @@ def gen_yml(device):
with open("config2.yml", "w") as fout:
yaml.dump(config, fout, default_flow_style=False)
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
def run_http(idx, batch_size):
print("start thread ({})".format(idx))
url = "http://127.0.0.1:9999/ocr/prediction"
start = time.time()
test_img_dir = "imgs/"
#test_img_dir = "rctw_test/images/"
latency_list = []
total_number = 0
for img_file in os.listdir(test_img_dir):
l_start = time.time()
with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data1 = file.read()
image = cv2_to_base64(image_data1)
data = {"key": ["image"], "value": [image]}
for i in range(100):
#for i in range(100):
r = requests.post(url=url, data=json.dumps(data))
print(r.json())
end = time.time()
return [[end - start]]
l_end = time.time()
latency_list.append(l_end * 1000 - l_start * 1000)
total_number = total_number + 1
return [[end - start], latency_list, [total_number]]
def multithread_http(thread, batch_size):
multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(run_http , thread, batch_size)
start = time.time()
result = multi_thread_runner.run(run_http, thread, batch_size)
end = time.time()
total_cost = end - start
avg_cost = 0
total_number = 0
for i in range(thread):
avg_cost += result[0][i]
total_number += result[2][i]
avg_cost = avg_cost / thread
print("Total cost: {}s".format(total_cost))
print("Each thread cost: {}s. ".format(avg_cost))
print("Total count: {}. ".format(total_number))
print("AVG QPS: {} samples/s".format(batch_size * total_number /
total_cost))
show_latency(result[1])
def run_rpc(thread, batch_size):
client = PipelineClient()
client.connect(['127.0.0.1:18090'])
start = time.time()
test_img_dir = "imgs/"
#test_img_dir = "rctw_test/images/"
latency_list = []
total_number = 0
for img_file in os.listdir(test_img_dir):
l_start = time.time()
with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data = file.read()
image = cv2_to_base64(image_data)
for i in range(100):
ret = client.predict(feed_dict={"image": image}, fetch=["res"])
print(ret)
l_end = time.time()
latency_list.append(l_end * 1000 - l_start * 1000)
total_number = total_number + 1
end = time.time()
return [[end - start]]
return [[end - start], latency_list, [total_number]]
def multithread_rpc(thraed, batch_size):
multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(run_rpc , thread, batch_size)
start = time.time()
result = multi_thread_runner.run(run_rpc, thread, batch_size)
end = time.time()
total_cost = end - start
avg_cost = 0
total_number = 0
for i in range(thread):
avg_cost += result[0][i]
total_number += result[2][i]
avg_cost = avg_cost / thread
print("Total cost: {}s".format(total_cost))
print("Each thread cost: {}s. ".format(avg_cost))
print("Total count: {}. ".format(total_number))
print("AVG QPS: {} samples/s".format(batch_size * total_number /
total_cost))
show_latency(result[1])
if __name__ == "__main__":
if sys.argv[1] == "yaml":
......@@ -98,4 +163,3 @@ if __name__ == "__main__":
filein = sys.argv[2]
fileout = sys.argv[3]
parse_benchmark(filein, fileout)
export FLAGS_profile_pipeline=1
alias python3="python3.7"
alias python3="python3.6"
modelname="ocr"
# HTTP
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
#ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep 3
python3 benchmark.py yaml local_predictor 1 gpu
# Create yaml,If you already have the config.yaml, ignore it.
#python3 benchmark.py yaml local_predictor 1 gpu
rm -rf profile_log_$modelname
for thread_num in 1 8 16
echo "Starting HTTP Clients..."
# Start a client in each thread, tesing the case of multiple threads.
for thread_num in 1 2 4 8 12 16
do
for batch_size in 1
do
echo "----Bert thread num: $thread_num batch size: $batch_size mode:http ----" >>profile_log_$modelname
rm -rf PipelineServingLogs
rm -rf cpu_utilization.py
python3 web_service.py >web.log 2>&1 &
sleep 3
nvidia-smi --id=2 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 &
nvidia-smi --id=2 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 &
echo '----$modelname thread num: $thread_num batch size: $batch_size mode:http ----' >>profile_log_$modelname
# Start one web service, If you start the service yourself, you can ignore it here.
#python3 web_service.py >web.log 2>&1 &
#sleep 3
# --id is the serial number of the GPU card, Must be the same as the gpu id used by the server.
nvidia-smi --id=3 --query-gpu=memory.used --format=csv -lms 1000 > gpu_use.log 2>&1 &
nvidia-smi --id=3 --query-gpu=utilization.gpu --format=csv -lms 1000 > gpu_utilization.log 2>&1 &
echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
python3 benchmark.py run http $thread_num $batch_size
python3 cpu_utilization.py >>profile_log_$modelname
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
python3 benchmark.py dump benchmark.log benchmark.tmp
mv benchmark.tmp benchmark.log
# Start http client
python3 benchmark.py run http $thread_num $batch_size > profile 2>&1
# Collect CPU metrics, Filter data that is zero momentarily, Record the maximum value of GPU memory and the average value of GPU utilization
python3 cpu_utilization.py >> profile_log_$modelname
grep -av '^0 %' gpu_utilization.log > gpu_utilization.log.tmp
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}' gpu_utilization.log >> profile_log_$modelname
cat benchmark.log >> profile_log_$modelname
#rm -rf gpu_use.log gpu_utilization.log
awk -F' ' '{sum+=$1} END {print "GPU_UTILIZATION:", sum/NR, sum, NR }' gpu_utilization.log.tmp >> profile_log_$modelname
# Show profiles
python3 ../../util/show_profile.py profile $thread_num >> profile_log_$modelname
tail -n 8 profile >> profile_log_$modelname
echo '' >> profile_log_$modelname
done
done
# Kill all nvidia-smi background task.
pkill nvidia-smi
echo "Starting RPC Clients..."
# RPC
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
#ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep 3
python3 benchmark.py yaml local_predictor 1 gpu
for thread_num in 1 8 16
# Create yaml,If you already have the config.yaml, ignore it.
#python3 benchmark.py yaml local_predictor 1 gpu
rm -rf profile_log_$modelname
# Start a client in each thread, tesing the case of multiple threads.
for thread_num in 1 2 4 6 8 12 16
do
for batch_size in 1
do
echo "----Bert thread num: $thread_num batch size: $batch_size mode:rpc ----" >>profile_log_$modelname
rm -rf PipelineServingLogs
rm -rf cpu_utilization.py
python3 web_service.py >web.log 2>&1 &
sleep 3
nvidia-smi --id=2 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 &
nvidia-smi --id=2 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 &
echo "----$modelname thread num: $thread_num batch size: $batch_size mode:rpc ----" >> profile_log_$modelname
# Start one web service, If you start the service yourself, you can ignore it here.
#python3 web_service.py >web.log 2>&1 &
#sleep 3
# --id is the serial number of the GPU card, Must be the same as the gpu id used by the server.
nvidia-smi --id=3 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 &
nvidia-smi --id=3 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 &
echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
python3 benchmark.py run rpc $thread_num $batch_size
python3 cpu_utilization.py >>profile_log_$modelname
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
python3 benchmark.py dump benchmark.log benchmark.tmp
mv benchmark.tmp benchmark.log
# Start http client
python3 benchmark.py run rpc $thread_num $batch_size > profile 2>&1
# Collect CPU metrics, Filter data that is zero momentarily, Record the maximum value of GPU memory and the average value of GPU utilization
python3 cpu_utilization.py >> profile_log_$modelname
grep -av '^0 %' gpu_utilization.log > gpu_utilization.log.tmp
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}' gpu_utilization.log >> profile_log_$modelname
#rm -rf gpu_use.log gpu_utilization.log
cat benchmark.log >> profile_log_$modelname
awk -F" " '{sum+=$1} END {print "GPU_UTILIZATION:", sum/NR, sum, NR }' gpu_utilization.log.tmp >> profile_log_$modelname
# Show profiles
python3 ../../util/show_profile.py profile $thread_num >> profile_log_$modelname
tail -n 8 profile >> profile_log_$modelname
echo "" >> profile_log_$modelname
done
done
# Kill all nvidia-smi background task.
pkill nvidia-smi
......@@ -45,6 +45,7 @@ class DetOp(Op):
imgs = []
for key in input_dict.keys():
data = base64.b64decode(input_dict[key].encode('utf8'))
self.raw_im = data
data = np.frombuffer(data, np.uint8)
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
self.ori_h, self.ori_w, _ = self.im.shape
......@@ -61,7 +62,7 @@ class DetOp(Op):
]
dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im}
out_dict = {"dt_boxes": dt_boxes, "image": self.raw_im}
return out_dict, None, ""
......@@ -73,7 +74,9 @@ class RecOp(Op):
def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items()
im = input_dict["image"]
raw_im = input_dict["image"]
data = np.frombuffer(raw_im, np.uint8)
im = cv2.imdecode(data, cv2.IMREAD_COLOR)
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
......@@ -99,7 +102,7 @@ class RecOp(Op):
"""
## Many mini-batchs, the type of feed_data is list.
max_batch_size = 6 # len(dt_boxes)
max_batch_size = len(dt_boxes)
# If max_batch_size is 0, skipping predict stage
if max_batch_size == 0:
......
......@@ -122,6 +122,17 @@ class ChannelData(object):
self.client_need_profile = client_need_profile
self.profile_data_set = set()
def get_size(self):
size = 0
dict_data = None
if isinstance(self.dictdata, dict):
for k in self.dictdata:
size += sys.getsizeof(self.dictdata[k]) + sys.getsizeof(k)
if isinstance(self.npdata, dict):
for k in self.npdata:
size += sys.getsizeof(self.npdata[k]) + sys.getsizeof(k)
return size
def add_profile(self, profile_set):
if self.client_need_profile is False:
self.client_need_profile = True
......@@ -213,10 +224,10 @@ class ChannelData(object):
else:
return 1
def __str__(self):
return "type[{}], error_code[{}], data_id[{}], log_id[{}], dict_data[{}]".format(
def get_all_data(self):
return "type[{}], error_code[{}], data_id[{}], log_id[{}], dict_size[{}]".format(
ChannelDataType(self.datatype).name, self.error_code, self.id,
self.log_id, str(self.dictdata))
self.log_id, self.get_size())
class ProcessChannel(object):
......@@ -313,8 +324,10 @@ class ProcessChannel(object):
def push(self, channeldata, op_name=None):
_LOGGER.debug(
self._log("(data_id={} log_id={}) Op({}) Enter channel::push".
format(channeldata.id, channeldata.log_id, op_name)))
self._log(
"(data_id={} log_id={}) Op({}) Enter channel::push producers:{}".
format(channeldata.id, channeldata.log_id, op_name,
len(self._producers))))
if len(self._producers) == 0:
_LOGGER.critical(
self._log(
......@@ -323,19 +336,30 @@ class ProcessChannel(object):
format(channeldata.id, channeldata.log_id, op_name)))
os._exit(-1)
elif len(self._producers) == 1:
start_time = _time()
with self._cv:
enter_cv_time = _time()
push_que_time = enter_cv_time
while self._stop.value == 0:
try:
self._que.put((channeldata.id, {
op_name: channeldata
}),
timeout=0)
push_que_time = _time()
break
except Queue.Full:
self._cv.wait()
if self._stop.value == 1:
raise ChannelStopError()
self._cv.notify_all()
notify_all_time = _time()
_LOGGER.debug(
"(data_id={}) Op({}) channel push cost! enter_cv:{} ms, push_que:{} ms, notify:{} ms, data_size:{}".
format(channeldata.id, op_name, (enter_cv_time - start_time)
* 1000, (push_que_time - enter_cv_time) * 1000, (
notify_all_time - push_que_time) * 1000,
channeldata.get_size()))
_LOGGER.debug(
self._log(
"(data_id={} log_id={}) Op({}) Pushed data into internal queue.".
......@@ -414,10 +438,15 @@ class ProcessChannel(object):
os._exit(-1)
elif len(self._consumer_cursors) == 1:
resp = None
time_1 = int(round(_time() * 1000000))
time_2 = time_1
time_3 = time_2
with self._cv:
time_2 = int(round(_time() * 1000000))
while self._stop.value == 0 and resp is None:
try:
resp = self._que.get(timeout=0)[1]
time_3 = int(round(_time() * 1000000))
break
except Queue.Empty:
if timeout is not None:
......@@ -432,7 +461,12 @@ class ProcessChannel(object):
self._cv.wait()
if self._stop.value == 1:
raise ChannelStopError()
key = list(resp.keys())[0]
data_id = resp[key].id
_LOGGER.debug(
"(data_id={}) op({}) front cost enter_cv:{} ms, queue_get:{} ms".
format(data_id, op_name, (time_2 - time_1) / 1000.0, (
time_3 - time_2) / 1000.0))
if resp is not None:
list_values = list(resp.values())
_LOGGER.debug(
......@@ -485,6 +519,7 @@ class ProcessChannel(object):
if self._stop.value == 1:
raise ChannelStopError()
time_1 = int(round(_time() * 1000000))
consumer_cursor = self._consumer_cursors[op_name]
base_cursor = self._base_cursor.value
data_idx = consumer_cursor - base_cursor
......@@ -519,6 +554,8 @@ class ProcessChannel(object):
self._cursor_count[new_consumer_cursor] += 1
self._cv.notify_all()
time_2 = int(round(_time() * 1000000))
#_LOGGER.warning("self._cv logic cost:{}".format(time2 - time1))
if resp is not None:
list_values = list(resp.values())
......
......@@ -123,7 +123,7 @@ class Op(object):
if self._auto_batching_timeout is None:
self._auto_batching_timeout = conf["auto_batching_timeout"]
if self._auto_batching_timeout <= 0 or self._batch_size == 1:
_LOGGER.warning(
_LOGGER.debug(
self._log(
"Because auto_batching_timeout <= 0 or batch_size == 1,"
" set auto_batching_timeout to None."))
......@@ -1005,6 +1005,7 @@ class Op(object):
for idx in range(batch_size):
try:
channeldata_dict = None
front_start_time = int(round(_time() * 1000000))
if timeout is not None:
remaining = endtime - _time()
if remaining <= 0.0:
......@@ -1017,8 +1018,8 @@ class Op(object):
channeldata_dict = input_channel.front(op_name)
batch.append(channeldata_dict)
_LOGGER.debug(
"_auto_batching_generator get {} channeldata from op:{} into batch, batch_size:{}".
format(idx, op_name, batch_size))
"_auto_batching_generator get {} channeldata from op:{} input channel. time={}".
format(idx, op_name, front_start_time))
except ChannelTimeoutError:
_LOGGER.debug("{} Failed to generate batch: "
"timeout".format(op_info_prefix))
......@@ -1152,6 +1153,13 @@ class Op(object):
# data in the whole batch is all error data
continue
# print
front_cost = int(round(_time() * 1000000)) - start
for data_id, parsed_data in parsed_data_dict.items():
_LOGGER.debug(
"(data_id={}) POP INPUT CHANNEL! op:{}, cost:{} ms".format(
data_id, self.name, front_cost / 1000.0))
# preprecess
start = profiler.record("prep#{}_0".format(op_info_prefix))
preped_data_dict, err_channeldata_dict, skip_process_dict \
......@@ -1199,6 +1207,7 @@ class Op(object):
= self._run_postprocess(parsed_data_dict, midped_data_dict, op_info_prefix, logid_dict)
end = profiler.record("postp#{}_1".format(op_info_prefix))
postp_time = end - start
after_postp_time = _time()
try:
for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels(
......@@ -1212,7 +1221,6 @@ class Op(object):
break
if len(postped_data_dict) == 0:
continue
# push data to channel (if run succ)
start = int(round(_time() * 1000000))
try:
......@@ -1226,12 +1234,21 @@ class Op(object):
profile_str=profile_str,
client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id])
after_outchannel_time = _time()
_LOGGER.debug(
"(data_id={}) PUSH OUTPUT CHANNEL! op:{} push cost:{} ms".
format(data_id, self.name, (after_outchannel_time -
after_postp_time) * 1000))
_LOGGER.debug(
"(data_id={}) PUSH OUTPUT CHANNEL! op:{} push data:{}".
format(data_id, self.name, postped_data.get_all_data()))
except ChannelStopError:
_LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op)
break
end = int(round(_time() * 1000000))
out_time = end - start
after_outchannel_time = int(round(_time() * 1000000))
if trace_buffer is not None:
trace_que.append({
"name": self.name,
......@@ -1345,14 +1362,7 @@ class RequestOp(Op):
raise ValueError("request is None")
for idx, key in enumerate(request.key):
data = request.value[idx]
try:
evaled_data = eval(data)
if isinstance(evaled_data, np.ndarray):
data = evaled_data
except Exception as e:
pass
dict_data[key] = data
dict_data[key] = request.value[idx]
log_id = request.logid
_LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \
name:{}, method:{}".format(log_id, request.clientip, request.name,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册