diff --git a/python/examples/cascade_rcnn/test_client.py b/python/examples/cascade_rcnn/test_client.py index 75e25985799ca8d3c65cf46e0d270b063af57ba6..b40e97acc3e84a7dc7411a7ad3c3f8c1dc8171a6 100644 --- a/python/examples/cascade_rcnn/test_client.py +++ b/python/examples/cascade_rcnn/test_client.py @@ -33,7 +33,7 @@ fetch_map = client.predict( "im_shape": np.array(list(im.shape[1:]) + [1.0]) }, fetch=["multiclass_nms_0.tmp_0"], - batch=True) + batch=False) fetch_map["image"] = '000000570688.jpg' print(fetch_map) postprocess(fetch_map) diff --git a/python/examples/pipeline/imdb_model_ensemble/README.md b/python/examples/pipeline/imdb_model_ensemble/README.md new file mode 100644 index 0000000000000000000000000000000000000000..f90024f6af04b7d50fe90eb6e04b71dd703300e3 --- /dev/null +++ b/python/examples/pipeline/imdb_model_ensemble/README.md @@ -0,0 +1,19 @@ +# IMDB model ensemble examples + +## Get models +``` +sh get_data.sh +``` + +## Start servers + +``` +python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn.log & +python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log & +python test_pipeline_server.py &>pipeline.log & +``` + +## Start clients +``` +python test_pipeline_client.py +``` diff --git a/python/examples/pipeline/simple_web_service/web_service.py b/python/examples/pipeline/simple_web_service/web_service.py index 84fb4824238a4f6b84e6861910729c98f99a15ee..4a90286fb477ef15d57b642e10ead7918f6bae68 100644 --- a/python/examples/pipeline/simple_web_service/web_service.py +++ b/python/examples/pipeline/simple_web_service/web_service.py @@ -17,6 +17,7 @@ except ImportError: from paddle_serving_server.web_service import WebService, Op import logging import numpy as np +import sys _LOGGER = logging.getLogger() @@ -31,11 +32,18 @@ class UciOp(Op): log_id, input_dict)) x_value = input_dict["x"] proc_dict = {} - if isinstance(x_value, (str, unicode)): - input_dict["x"] = np.array( - [float(x.strip()) - for x in x_value.split(self.separator)]).reshape(1, 13) - _LOGGER.error("input_dict:{}".format(input_dict)) + if sys.version_info.major == 2: + if isinstance(x_value, (str, unicode)): + input_dict["x"] = np.array( + [float(x.strip()) + for x in x_value.split(self.separator)]).reshape(1, 13) + _LOGGER.error("input_dict:{}".format(input_dict)) + else: + if isinstance(x_value, str): + input_dict["x"] = np.array( + [float(x.strip()) + for x in x_value.split(self.separator)]).reshape(1, 13) + _LOGGER.error("input_dict:{}".format(input_dict)) return input_dict, False, None, "" diff --git a/python/pipeline/analyse.py b/python/pipeline/analyse.py index 424b7e025394467840ae77a696e42cefc5a06eed..814b43acaf52bbf0c066ff4bbdce2a0165508a2d 100644 --- a/python/pipeline/analyse.py +++ b/python/pipeline/analyse.py @@ -312,7 +312,7 @@ class OpAnalyst(object): # reduce op times op_times = { - op_name: sum(step_times.values()) + op_name: sum(list(step_times.values())) for op_name, step_times in op_times.items() } diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index c6d16352b87e92e28b3a6775c512dd29dcedb294..9f06c445d1366ed9151b42f2f6532de8ac986758 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -429,9 +429,12 @@ class ProcessChannel(object): self._cv.wait() if self._stop.value == 1: raise ChannelStopError() - _LOGGER.debug( - self._log("(data_id={} log_id={}) Op({}) Got data".format( - resp.values()[0].id, resp.values()[0].log_id, op_name))) + + if resp is not None: + list_values = list(resp.values()) + _LOGGER.debug( + self._log("(data_id={} log_id={}) Op({}) Got data".format( + list_values[0].id, list_values[0].log_id, op_name))) return resp elif op_name is None: _LOGGER.critical( @@ -458,11 +461,12 @@ class ProcessChannel(object): try: channeldata = self._que.get(timeout=0) self._output_buf.append(channeldata) + list_values = list(channeldata.values()) _LOGGER.debug( self._log( "(data_id={} log_id={}) Op({}) Pop ready item into output_buffer". - format(channeldata.values()[0].id, - channeldata.values()[0].log_id, op_name))) + format(list_values[0].id, list_values[0].log_id, + op_name))) break except Queue.Empty: if timeout is not None: @@ -513,10 +517,12 @@ class ProcessChannel(object): self._cv.notify_all() - _LOGGER.debug( - self._log( - "(data_id={} log_id={}) Op({}) Got data from output_buffer". - format(resp.values()[0].id, resp.values()[0].log_id, op_name))) + if resp is not None: + list_values = list(resp.values()) + _LOGGER.debug( + self._log( + "(data_id={} log_id={}) Op({}) Got data from output_buffer". + format(list_values[0].id, list_values[0].log_id, op_name))) return resp def stop(self): @@ -726,9 +732,11 @@ class ThreadChannel(Queue.PriorityQueue): self._cv.wait() if self._stop: raise ChannelStopError() - _LOGGER.debug( - self._log("(data_id={} log_id={}) Op({}) Got data".format( - resp.values()[0].id, resp.values()[0].log_id, op_name))) + if resp is not None: + list_values = list(resp.values()) + _LOGGER.debug( + self._log("(data_id={} log_id={}) Op({}) Got data".format( + list_values[0].id, list_values[0].log_id, op_name))) return resp elif op_name is None: _LOGGER.critical( @@ -755,11 +763,12 @@ class ThreadChannel(Queue.PriorityQueue): try: channeldata = self.get(timeout=0) self._output_buf.append(channeldata) + list_values = list(channeldata.values()) _LOGGER.debug( self._log( "(data_id={} log_id={}) Op({}) Pop ready item into output_buffer". - format(channeldata.values()[0].id, - channeldata.values()[0].log_id, op_name))) + format(list_values[0].id, list_values[0].log_id, + op_name))) break except Queue.Empty: if timeout is not None: @@ -810,10 +819,12 @@ class ThreadChannel(Queue.PriorityQueue): self._cv.notify_all() - _LOGGER.debug( - self._log( - "(data_id={} log_id={}) Op({}) Got data from output_buffer". - format(resp.values()[0].id, resp.values()[0].log_id, op_name))) + if resp is not None: + list_values = list(resp.values()) + _LOGGER.debug( + self._log( + "(data_id={} log_id={}) Op({}) Got data from output_buffer". + format(list_values[0].id, list_values[0].log_id, op_name))) return resp def stop(self): diff --git a/python/pipeline/local_service_handler.py b/python/pipeline/local_service_handler.py index 26450e28dcbf4f99a7149d3005faf8f93abc63f8..a73627b69a37325b9895fa8a3217314d0371f539 100644 --- a/python/pipeline/local_service_handler.py +++ b/python/pipeline/local_service_handler.py @@ -105,18 +105,35 @@ class LocalServiceHandler(object): def get_port_list(self): return self._port_list - def get_client(self): + def get_client(self, concurrency_idx): """ Function get_client is only used for local predictor case, creates one LocalPredictor object, and initializes the paddle predictor by function - load_model_config. + load_model_config.The concurrency_idx is used to select running devices. Args: - None + concurrency_idx: process/thread index Returns: _local_predictor_client """ + + #checking the legality of concurrency_idx. + device_num = len(self._devices) + if device_num <= 0: + _LOGGER.error("device_num must be not greater than 0. devices({})". + format(self._devices)) + raise ValueError("The number of self._devices error") + + if concurrency_idx < 0: + _LOGGER.error("concurrency_idx({}) must be one positive number". + format(concurrency_idx)) + concurrency_idx = 0 + elif concurrency_idx >= device_num: + concurrency_idx = concurrency_idx % device_num + + _LOGGER.info("GET_CLIENT : concurrency_idx={}, device_num={}".format( + concurrency_idx, device_num)) from paddle_serving_app.local_predict import LocalPredictor if self._local_predictor_client is None: self._local_predictor_client = LocalPredictor() @@ -126,7 +143,7 @@ class LocalServiceHandler(object): self._local_predictor_client.load_model_config( model_path=self._model_config, use_gpu=use_gpu, - gpu_id=self._devices[0], + gpu_id=self._devices[concurrency_idx], use_profile=self._use_profile, thread_num=self._thread_num, mem_optim=self._mem_optim, diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 92e0c0c6e0bb2e415f48729d25c2153d2026b6b2..4f488f6538f9faa2ae705378d5a0ae99538a6e5d 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -55,7 +55,7 @@ class Op(object): client_type=None, concurrency=None, timeout=None, - retry=None, + retry=0, batch_size=None, auto_batching_timeout=None, local_service_handler=None): @@ -574,7 +574,7 @@ class Op(object): #Init cuda env in main thread if self.client_type == "local_predictor": _LOGGER.info("Init cuda env in main thread") - self.local_predictor = self._local_service_handler.get_client() + self.local_predictor = self._local_service_handler.get_client(0) threads = [] for concurrency_idx in range(self.concurrency): @@ -679,7 +679,7 @@ class Op(object): err_channeldata_dict = collections.OrderedDict() ### if (batch_num == 1 && skip == True) ,then skip the process stage. is_skip_process = False - data_ids = preped_data_dict.keys() + data_ids = list(preped_data_dict.keys()) if len(data_ids) == 1 and skip_process_dict.get(data_ids[0]) == True: is_skip_process = True _LOGGER.info("(data_id={} log_id={}) skip process stage".format( @@ -1034,7 +1034,8 @@ class Op(object): _LOGGER.info("Init cuda env in process {}".format( concurrency_idx)) - self.local_predictor = self.service_handler.get_client() + self.local_predictor = self.service_handler.get_client( + concurrency_idx) # check all ops initialized successfully. profiler = self._initialize(is_thread_op, concurrency_idx) diff --git a/python/pipeline/pipeline_client.py b/python/pipeline/pipeline_client.py index 265f88c444e2484e7e50705b507bf00bbe0db0e1..132cf043cd49f097c4ee47e36ce67f53f022b82a 100644 --- a/python/pipeline/pipeline_client.py +++ b/python/pipeline/pipeline_client.py @@ -53,10 +53,10 @@ class PipelineClient(object): if logid is None: req.logid = 0 else: - if six.PY2: + if sys.version_info.major == 2: req.logid = long(logid) - elif six.PY3: - req.logid = int(log_id) + elif sys.version_info.major == 3: + req.logid = int(logid) feed_dict.pop("logid") clientip = feed_dict.get("clientip") @@ -71,10 +71,15 @@ class PipelineClient(object): np.set_printoptions(threshold=sys.maxsize) for key, value in feed_dict.items(): req.key.append(key) + + if (sys.version_info.major == 2 and isinstance(value, + (str, unicode)) or + ((sys.version_info.major == 3) and isinstance(value, str))): + req.value.append(value) + continue + if isinstance(value, np.ndarray): req.value.append(value.__repr__()) - elif isinstance(value, (str, unicode)): - req.value.append(value) elif isinstance(value, list): req.value.append(np.array(value).__repr__()) else: