未验证 提交 c42a5ad6 编写于 作者: J Jiawei Wang 提交者: GitHub

Merge pull request #1209 from TeslaZhao/develop

Update Pipeline benchmark & trace logs & OCR Examples
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys import sys
import os import os
import base64 import base64
...@@ -12,6 +26,8 @@ except ImportError: ...@@ -12,6 +26,8 @@ except ImportError:
import numpy as np import numpy as np
from paddle_serving_client.utils import MultiThreadRunner from paddle_serving_client.utils import MultiThreadRunner
from paddle_serving_client.utils import benchmark_args, show_latency from paddle_serving_client.utils import benchmark_args, show_latency
def parse_benchmark(filein, fileout): def parse_benchmark(filein, fileout):
with open(filein, "r") as fin: with open(filein, "r") as fin:
res = yaml.load(fin) res = yaml.load(fin)
...@@ -24,6 +40,7 @@ def parse_benchmark(filein, fileout): ...@@ -24,6 +40,7 @@ def parse_benchmark(filein, fileout):
with open(fileout, "w") as fout: with open(fileout, "w") as fout:
yaml.dump(res, fout, default_flow_style=False) yaml.dump(res, fout, default_flow_style=False)
def gen_yml(device): def gen_yml(device):
fin = open("config.yml", "r") fin = open("config.yml", "r")
config = yaml.load(fin) config = yaml.load(fin)
...@@ -33,61 +50,109 @@ def gen_yml(device): ...@@ -33,61 +50,109 @@ def gen_yml(device):
config["op"]["det"]["local_service_conf"]["device_type"] = 1 config["op"]["det"]["local_service_conf"]["device_type"] = 1
config["op"]["det"]["local_service_conf"]["devices"] = "2" config["op"]["det"]["local_service_conf"]["devices"] = "2"
config["op"]["rec"]["local_service_conf"]["device_type"] = 1 config["op"]["rec"]["local_service_conf"]["device_type"] = 1
config["op"]["rec"]["local_service_conf"]["devices"] = "2" config["op"]["rec"]["local_service_conf"]["devices"] = "2"
with open("config2.yml", "w") as fout: with open("config2.yml", "w") as fout:
yaml.dump(config, fout, default_flow_style=False) yaml.dump(config, fout, default_flow_style=False)
def cv2_to_base64(image): def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8') return base64.b64encode(image).decode('utf8')
def run_http(idx, batch_size): def run_http(idx, batch_size):
print("start thread ({})".format(idx)) print("start thread ({})".format(idx))
url = "http://127.0.0.1:9999/ocr/prediction" url = "http://127.0.0.1:9999/ocr/prediction"
start = time.time() start = time.time()
test_img_dir = "imgs/" test_img_dir = "imgs/"
#test_img_dir = "rctw_test/images/"
latency_list = []
total_number = 0
for img_file in os.listdir(test_img_dir): for img_file in os.listdir(test_img_dir):
l_start = time.time()
with open(os.path.join(test_img_dir, img_file), 'rb') as file: with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data1 = file.read() image_data1 = file.read()
image = cv2_to_base64(image_data1) image = cv2_to_base64(image_data1)
data = {"key": ["image"], "value": [image]} data = {"key": ["image"], "value": [image]}
for i in range(100): #for i in range(100):
r = requests.post(url=url, data=json.dumps(data)) r = requests.post(url=url, data=json.dumps(data))
print(r.json())
end = time.time() end = time.time()
return [[end - start]] l_end = time.time()
latency_list.append(l_end * 1000 - l_start * 1000)
total_number = total_number + 1
return [[end - start], latency_list, [total_number]]
def multithread_http(thread, batch_size): def multithread_http(thread, batch_size):
multi_thread_runner = MultiThreadRunner() multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(run_http , thread, batch_size) start = time.time()
result = multi_thread_runner.run(run_http, thread, batch_size)
end = time.time()
total_cost = end - start
avg_cost = 0
total_number = 0
for i in range(thread):
avg_cost += result[0][i]
total_number += result[2][i]
avg_cost = avg_cost / thread
print("Total cost: {}s".format(total_cost))
print("Each thread cost: {}s. ".format(avg_cost))
print("Total count: {}. ".format(total_number))
print("AVG QPS: {} samples/s".format(batch_size * total_number /
total_cost))
show_latency(result[1])
def run_rpc(thread, batch_size): def run_rpc(thread, batch_size):
client = PipelineClient() client = PipelineClient()
client.connect(['127.0.0.1:18090']) client.connect(['127.0.0.1:18090'])
start = time.time() start = time.time()
test_img_dir = "imgs/" test_img_dir = "imgs/"
#test_img_dir = "rctw_test/images/"
latency_list = []
total_number = 0
for img_file in os.listdir(test_img_dir): for img_file in os.listdir(test_img_dir):
l_start = time.time()
with open(os.path.join(test_img_dir, img_file), 'rb') as file: with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data = file.read() image_data = file.read()
image = cv2_to_base64(image_data) image = cv2_to_base64(image_data)
ret = client.predict(feed_dict={"image": image}, fetch=["res"])
for i in range(100): print(ret)
ret = client.predict(feed_dict={"image": image}, fetch=["res"]) l_end = time.time()
latency_list.append(l_end * 1000 - l_start * 1000)
total_number = total_number + 1
end = time.time() end = time.time()
return [[end - start]] return [[end - start], latency_list, [total_number]]
def multithread_rpc(thraed, batch_size): def multithread_rpc(thraed, batch_size):
multi_thread_runner = MultiThreadRunner() multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(run_rpc , thread, batch_size) start = time.time()
result = multi_thread_runner.run(run_rpc, thread, batch_size)
end = time.time()
total_cost = end - start
avg_cost = 0
total_number = 0
for i in range(thread):
avg_cost += result[0][i]
total_number += result[2][i]
avg_cost = avg_cost / thread
print("Total cost: {}s".format(total_cost))
print("Each thread cost: {}s. ".format(avg_cost))
print("Total count: {}. ".format(total_number))
print("AVG QPS: {} samples/s".format(batch_size * total_number /
total_cost))
show_latency(result[1])
if __name__ == "__main__": if __name__ == "__main__":
if sys.argv[1] == "yaml": if sys.argv[1] == "yaml":
mode = sys.argv[2] # brpc/ local predictor mode = sys.argv[2] # brpc/ local predictor
thread = int(sys.argv[3]) thread = int(sys.argv[3])
device = sys.argv[4] device = sys.argv[4]
gen_yml(device) gen_yml(device)
elif sys.argv[1] == "run": elif sys.argv[1] == "run":
mode = sys.argv[2] # http/ rpc mode = sys.argv[2] # http/ rpc
thread = int(sys.argv[3]) thread = int(sys.argv[3])
batch_size = int(sys.argv[4]) batch_size = int(sys.argv[4])
if mode == "http": if mode == "http":
...@@ -98,4 +163,3 @@ if __name__ == "__main__": ...@@ -98,4 +163,3 @@ if __name__ == "__main__":
filein = sys.argv[2] filein = sys.argv[2]
fileout = sys.argv[3] fileout = sys.argv[3]
parse_benchmark(filein, fileout) parse_benchmark(filein, fileout)
export FLAGS_profile_pipeline=1 export FLAGS_profile_pipeline=1
alias python3="python3.7" alias python3="python3.6"
modelname="ocr" modelname="ocr"
# HTTP # HTTP
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 #ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep 3 sleep 3
python3 benchmark.py yaml local_predictor 1 gpu # Create yaml,If you already have the config.yaml, ignore it.
#python3 benchmark.py yaml local_predictor 1 gpu
rm -rf profile_log_$modelname rm -rf profile_log_$modelname
for thread_num in 1 8 16
echo "Starting HTTP Clients..."
# Start a client in each thread, tesing the case of multiple threads.
for thread_num in 1 2 4 8 12 16
do do
for batch_size in 1 for batch_size in 1
do do
echo "----Bert thread num: $thread_num batch size: $batch_size mode:http ----" >>profile_log_$modelname echo '----$modelname thread num: $thread_num batch size: $batch_size mode:http ----' >>profile_log_$modelname
rm -rf PipelineServingLogs # Start one web service, If you start the service yourself, you can ignore it here.
rm -rf cpu_utilization.py #python3 web_service.py >web.log 2>&1 &
python3 web_service.py >web.log 2>&1 & #sleep 3
sleep 3
nvidia-smi --id=2 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 & # --id is the serial number of the GPU card, Must be the same as the gpu id used by the server.
nvidia-smi --id=2 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 & nvidia-smi --id=3 --query-gpu=memory.used --format=csv -lms 1000 > gpu_use.log 2>&1 &
nvidia-smi --id=3 --query-gpu=utilization.gpu --format=csv -lms 1000 > gpu_utilization.log 2>&1 &
echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
python3 benchmark.py run http $thread_num $batch_size # Start http client
python3 cpu_utilization.py >>profile_log_$modelname python3 benchmark.py run http $thread_num $batch_size > profile 2>&1
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
python3 benchmark.py dump benchmark.log benchmark.tmp # Collect CPU metrics, Filter data that is zero momentarily, Record the maximum value of GPU memory and the average value of GPU utilization
mv benchmark.tmp benchmark.log python3 cpu_utilization.py >> profile_log_$modelname
grep -av '^0 %' gpu_utilization.log > gpu_utilization.log.tmp
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}' gpu_utilization.log >> profile_log_$modelname awk -F' ' '{sum+=$1} END {print "GPU_UTILIZATION:", sum/NR, sum, NR }' gpu_utilization.log.tmp >> profile_log_$modelname
cat benchmark.log >> profile_log_$modelname
#rm -rf gpu_use.log gpu_utilization.log # Show profiles
python3 ../../util/show_profile.py profile $thread_num >> profile_log_$modelname
tail -n 8 profile >> profile_log_$modelname
echo '' >> profile_log_$modelname
done done
done done
# Kill all nvidia-smi background task.
pkill nvidia-smi
echo "Starting RPC Clients..."
# RPC # RPC
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 #ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep 3 sleep 3
python3 benchmark.py yaml local_predictor 1 gpu
for thread_num in 1 8 16 # Create yaml,If you already have the config.yaml, ignore it.
#python3 benchmark.py yaml local_predictor 1 gpu
rm -rf profile_log_$modelname
# Start a client in each thread, tesing the case of multiple threads.
for thread_num in 1 2 4 6 8 12 16
do do
for batch_size in 1 for batch_size in 1
do do
echo "----Bert thread num: $thread_num batch size: $batch_size mode:rpc ----" >>profile_log_$modelname echo "----$modelname thread num: $thread_num batch size: $batch_size mode:rpc ----" >> profile_log_$modelname
rm -rf PipelineServingLogs # Start one web service, If you start the service yourself, you can ignore it here.
rm -rf cpu_utilization.py #python3 web_service.py >web.log 2>&1 &
python3 web_service.py >web.log 2>&1 & #sleep 3
sleep 3
nvidia-smi --id=2 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 & # --id is the serial number of the GPU card, Must be the same as the gpu id used by the server.
nvidia-smi --id=2 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 & nvidia-smi --id=3 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 &
nvidia-smi --id=3 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 &
echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
python3 benchmark.py run rpc $thread_num $batch_size
python3 cpu_utilization.py >>profile_log_$modelname # Start http client
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 python3 benchmark.py run rpc $thread_num $batch_size > profile 2>&1
python3 benchmark.py dump benchmark.log benchmark.tmp
mv benchmark.tmp benchmark.log # Collect CPU metrics, Filter data that is zero momentarily, Record the maximum value of GPU memory and the average value of GPU utilization
python3 cpu_utilization.py >> profile_log_$modelname
grep -av '^0 %' gpu_utilization.log > gpu_utilization.log.tmp
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}' gpu_utilization.log >> profile_log_$modelname awk -F" " '{sum+=$1} END {print "GPU_UTILIZATION:", sum/NR, sum, NR }' gpu_utilization.log.tmp >> profile_log_$modelname
#rm -rf gpu_use.log gpu_utilization.log
cat benchmark.log >> profile_log_$modelname # Show profiles
python3 ../../util/show_profile.py profile $thread_num >> profile_log_$modelname
tail -n 8 profile >> profile_log_$modelname
echo "" >> profile_log_$modelname
done done
done done
# Kill all nvidia-smi background task.
pkill nvidia-smi
...@@ -45,6 +45,7 @@ class DetOp(Op): ...@@ -45,6 +45,7 @@ class DetOp(Op):
imgs = [] imgs = []
for key in input_dict.keys(): for key in input_dict.keys():
data = base64.b64decode(input_dict[key].encode('utf8')) data = base64.b64decode(input_dict[key].encode('utf8'))
self.raw_im = data
data = np.frombuffer(data, np.uint8) data = np.frombuffer(data, np.uint8)
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR) self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
self.ori_h, self.ori_w, _ = self.im.shape self.ori_h, self.ori_w, _ = self.im.shape
...@@ -61,7 +62,7 @@ class DetOp(Op): ...@@ -61,7 +62,7 @@ class DetOp(Op):
] ]
dt_boxes_list = self.post_func(det_out, [ratio_list]) dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w]) dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im} out_dict = {"dt_boxes": dt_boxes, "image": self.raw_im}
return out_dict, None, "" return out_dict, None, ""
...@@ -73,7 +74,9 @@ class RecOp(Op): ...@@ -73,7 +74,9 @@ class RecOp(Op):
def preprocess(self, input_dicts, data_id, log_id): def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items() (_, input_dict), = input_dicts.items()
im = input_dict["image"] raw_im = input_dict["image"]
data = np.frombuffer(raw_im, np.uint8)
im = cv2.imdecode(data, cv2.IMREAD_COLOR)
dt_boxes = input_dict["dt_boxes"] dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes) dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = [] feed_list = []
...@@ -99,7 +102,7 @@ class RecOp(Op): ...@@ -99,7 +102,7 @@ class RecOp(Op):
""" """
## Many mini-batchs, the type of feed_data is list. ## Many mini-batchs, the type of feed_data is list.
max_batch_size = 6 # len(dt_boxes) max_batch_size = len(dt_boxes)
# If max_batch_size is 0, skipping predict stage # If max_batch_size is 0, skipping predict stage
if max_batch_size == 0: if max_batch_size == 0:
......
...@@ -122,6 +122,17 @@ class ChannelData(object): ...@@ -122,6 +122,17 @@ class ChannelData(object):
self.client_need_profile = client_need_profile self.client_need_profile = client_need_profile
self.profile_data_set = set() self.profile_data_set = set()
def get_size(self):
size = 0
dict_data = None
if isinstance(self.dictdata, dict):
for k in self.dictdata:
size += sys.getsizeof(self.dictdata[k]) + sys.getsizeof(k)
if isinstance(self.npdata, dict):
for k in self.npdata:
size += sys.getsizeof(self.npdata[k]) + sys.getsizeof(k)
return size
def add_profile(self, profile_set): def add_profile(self, profile_set):
if self.client_need_profile is False: if self.client_need_profile is False:
self.client_need_profile = True self.client_need_profile = True
...@@ -213,10 +224,10 @@ class ChannelData(object): ...@@ -213,10 +224,10 @@ class ChannelData(object):
else: else:
return 1 return 1
def __str__(self): def get_all_data(self):
return "type[{}], error_code[{}], data_id[{}], log_id[{}], dict_data[{}]".format( return "type[{}], error_code[{}], data_id[{}], log_id[{}], dict_size[{}]".format(
ChannelDataType(self.datatype).name, self.error_code, self.id, ChannelDataType(self.datatype).name, self.error_code, self.id,
self.log_id, str(self.dictdata)) self.log_id, self.get_size())
class ProcessChannel(object): class ProcessChannel(object):
...@@ -313,8 +324,10 @@ class ProcessChannel(object): ...@@ -313,8 +324,10 @@ class ProcessChannel(object):
def push(self, channeldata, op_name=None): def push(self, channeldata, op_name=None):
_LOGGER.debug( _LOGGER.debug(
self._log("(data_id={} log_id={}) Op({}) Enter channel::push". self._log(
format(channeldata.id, channeldata.log_id, op_name))) "(data_id={} log_id={}) Op({}) Enter channel::push producers:{}".
format(channeldata.id, channeldata.log_id, op_name,
len(self._producers))))
if len(self._producers) == 0: if len(self._producers) == 0:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log(
...@@ -323,19 +336,30 @@ class ProcessChannel(object): ...@@ -323,19 +336,30 @@ class ProcessChannel(object):
format(channeldata.id, channeldata.log_id, op_name))) format(channeldata.id, channeldata.log_id, op_name)))
os._exit(-1) os._exit(-1)
elif len(self._producers) == 1: elif len(self._producers) == 1:
start_time = _time()
with self._cv: with self._cv:
enter_cv_time = _time()
push_que_time = enter_cv_time
while self._stop.value == 0: while self._stop.value == 0:
try: try:
self._que.put((channeldata.id, { self._que.put((channeldata.id, {
op_name: channeldata op_name: channeldata
}), }),
timeout=0) timeout=0)
push_que_time = _time()
break break
except Queue.Full: except Queue.Full:
self._cv.wait() self._cv.wait()
if self._stop.value == 1: if self._stop.value == 1:
raise ChannelStopError() raise ChannelStopError()
self._cv.notify_all() self._cv.notify_all()
notify_all_time = _time()
_LOGGER.debug(
"(data_id={}) Op({}) channel push cost! enter_cv:{} ms, push_que:{} ms, notify:{} ms, data_size:{}".
format(channeldata.id, op_name, (enter_cv_time - start_time)
* 1000, (push_que_time - enter_cv_time) * 1000, (
notify_all_time - push_que_time) * 1000,
channeldata.get_size()))
_LOGGER.debug( _LOGGER.debug(
self._log( self._log(
"(data_id={} log_id={}) Op({}) Pushed data into internal queue.". "(data_id={} log_id={}) Op({}) Pushed data into internal queue.".
...@@ -414,10 +438,15 @@ class ProcessChannel(object): ...@@ -414,10 +438,15 @@ class ProcessChannel(object):
os._exit(-1) os._exit(-1)
elif len(self._consumer_cursors) == 1: elif len(self._consumer_cursors) == 1:
resp = None resp = None
time_1 = int(round(_time() * 1000000))
time_2 = time_1
time_3 = time_2
with self._cv: with self._cv:
time_2 = int(round(_time() * 1000000))
while self._stop.value == 0 and resp is None: while self._stop.value == 0 and resp is None:
try: try:
resp = self._que.get(timeout=0)[1] resp = self._que.get(timeout=0)[1]
time_3 = int(round(_time() * 1000000))
break break
except Queue.Empty: except Queue.Empty:
if timeout is not None: if timeout is not None:
...@@ -432,7 +461,12 @@ class ProcessChannel(object): ...@@ -432,7 +461,12 @@ class ProcessChannel(object):
self._cv.wait() self._cv.wait()
if self._stop.value == 1: if self._stop.value == 1:
raise ChannelStopError() raise ChannelStopError()
key = list(resp.keys())[0]
data_id = resp[key].id
_LOGGER.debug(
"(data_id={}) op({}) front cost enter_cv:{} ms, queue_get:{} ms".
format(data_id, op_name, (time_2 - time_1) / 1000.0, (
time_3 - time_2) / 1000.0))
if resp is not None: if resp is not None:
list_values = list(resp.values()) list_values = list(resp.values())
_LOGGER.debug( _LOGGER.debug(
...@@ -485,6 +519,7 @@ class ProcessChannel(object): ...@@ -485,6 +519,7 @@ class ProcessChannel(object):
if self._stop.value == 1: if self._stop.value == 1:
raise ChannelStopError() raise ChannelStopError()
time_1 = int(round(_time() * 1000000))
consumer_cursor = self._consumer_cursors[op_name] consumer_cursor = self._consumer_cursors[op_name]
base_cursor = self._base_cursor.value base_cursor = self._base_cursor.value
data_idx = consumer_cursor - base_cursor data_idx = consumer_cursor - base_cursor
...@@ -519,6 +554,8 @@ class ProcessChannel(object): ...@@ -519,6 +554,8 @@ class ProcessChannel(object):
self._cursor_count[new_consumer_cursor] += 1 self._cursor_count[new_consumer_cursor] += 1
self._cv.notify_all() self._cv.notify_all()
time_2 = int(round(_time() * 1000000))
#_LOGGER.warning("self._cv logic cost:{}".format(time2 - time1))
if resp is not None: if resp is not None:
list_values = list(resp.values()) list_values = list(resp.values())
......
...@@ -123,7 +123,7 @@ class Op(object): ...@@ -123,7 +123,7 @@ class Op(object):
if self._auto_batching_timeout is None: if self._auto_batching_timeout is None:
self._auto_batching_timeout = conf["auto_batching_timeout"] self._auto_batching_timeout = conf["auto_batching_timeout"]
if self._auto_batching_timeout <= 0 or self._batch_size == 1: if self._auto_batching_timeout <= 0 or self._batch_size == 1:
_LOGGER.warning( _LOGGER.debug(
self._log( self._log(
"Because auto_batching_timeout <= 0 or batch_size == 1," "Because auto_batching_timeout <= 0 or batch_size == 1,"
" set auto_batching_timeout to None.")) " set auto_batching_timeout to None."))
...@@ -1005,6 +1005,7 @@ class Op(object): ...@@ -1005,6 +1005,7 @@ class Op(object):
for idx in range(batch_size): for idx in range(batch_size):
try: try:
channeldata_dict = None channeldata_dict = None
front_start_time = int(round(_time() * 1000000))
if timeout is not None: if timeout is not None:
remaining = endtime - _time() remaining = endtime - _time()
if remaining <= 0.0: if remaining <= 0.0:
...@@ -1017,8 +1018,8 @@ class Op(object): ...@@ -1017,8 +1018,8 @@ class Op(object):
channeldata_dict = input_channel.front(op_name) channeldata_dict = input_channel.front(op_name)
batch.append(channeldata_dict) batch.append(channeldata_dict)
_LOGGER.debug( _LOGGER.debug(
"_auto_batching_generator get {} channeldata from op:{} into batch, batch_size:{}". "_auto_batching_generator get {} channeldata from op:{} input channel. time={}".
format(idx, op_name, batch_size)) format(idx, op_name, front_start_time))
except ChannelTimeoutError: except ChannelTimeoutError:
_LOGGER.debug("{} Failed to generate batch: " _LOGGER.debug("{} Failed to generate batch: "
"timeout".format(op_info_prefix)) "timeout".format(op_info_prefix))
...@@ -1152,6 +1153,13 @@ class Op(object): ...@@ -1152,6 +1153,13 @@ class Op(object):
# data in the whole batch is all error data # data in the whole batch is all error data
continue continue
# print
front_cost = int(round(_time() * 1000000)) - start
for data_id, parsed_data in parsed_data_dict.items():
_LOGGER.debug(
"(data_id={}) POP INPUT CHANNEL! op:{}, cost:{} ms".format(
data_id, self.name, front_cost / 1000.0))
# preprecess # preprecess
start = profiler.record("prep#{}_0".format(op_info_prefix)) start = profiler.record("prep#{}_0".format(op_info_prefix))
preped_data_dict, err_channeldata_dict, skip_process_dict \ preped_data_dict, err_channeldata_dict, skip_process_dict \
...@@ -1199,6 +1207,7 @@ class Op(object): ...@@ -1199,6 +1207,7 @@ class Op(object):
= self._run_postprocess(parsed_data_dict, midped_data_dict, op_info_prefix, logid_dict) = self._run_postprocess(parsed_data_dict, midped_data_dict, op_info_prefix, logid_dict)
end = profiler.record("postp#{}_1".format(op_info_prefix)) end = profiler.record("postp#{}_1".format(op_info_prefix))
postp_time = end - start postp_time = end - start
after_postp_time = _time()
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
...@@ -1212,7 +1221,6 @@ class Op(object): ...@@ -1212,7 +1221,6 @@ class Op(object):
break break
if len(postped_data_dict) == 0: if len(postped_data_dict) == 0:
continue continue
# push data to channel (if run succ) # push data to channel (if run succ)
start = int(round(_time() * 1000000)) start = int(round(_time() * 1000000))
try: try:
...@@ -1226,12 +1234,21 @@ class Op(object): ...@@ -1226,12 +1234,21 @@ class Op(object):
profile_str=profile_str, profile_str=profile_str,
client_need_profile=need_profile_dict[data_id], client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id]) profile_set=profile_dict[data_id])
after_outchannel_time = _time()
_LOGGER.debug(
"(data_id={}) PUSH OUTPUT CHANNEL! op:{} push cost:{} ms".
format(data_id, self.name, (after_outchannel_time -
after_postp_time) * 1000))
_LOGGER.debug(
"(data_id={}) PUSH OUTPUT CHANNEL! op:{} push data:{}".
format(data_id, self.name, postped_data.get_all_data()))
except ChannelStopError: except ChannelStopError:
_LOGGER.debug("{} Stop.".format(op_info_prefix)) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
end = int(round(_time() * 1000000)) end = int(round(_time() * 1000000))
out_time = end - start out_time = end - start
after_outchannel_time = int(round(_time() * 1000000))
if trace_buffer is not None: if trace_buffer is not None:
trace_que.append({ trace_que.append({
"name": self.name, "name": self.name,
...@@ -1345,14 +1362,7 @@ class RequestOp(Op): ...@@ -1345,14 +1362,7 @@ class RequestOp(Op):
raise ValueError("request is None") raise ValueError("request is None")
for idx, key in enumerate(request.key): for idx, key in enumerate(request.key):
data = request.value[idx] dict_data[key] = request.value[idx]
try:
evaled_data = eval(data)
if isinstance(evaled_data, np.ndarray):
data = evaled_data
except Exception as e:
pass
dict_data[key] = data
log_id = request.logid log_id = request.logid
_LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \ _LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \
name:{}, method:{}".format(log_id, request.clientip, request.name, name:{}, method:{}".format(log_id, request.clientip, request.name,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册