diff --git a/python/examples/pipeline/imdb_model_ensemble/README.md b/python/examples/pipeline/imdb_model_ensemble/README.md new file mode 100644 index 0000000000000000000000000000000000000000..f90024f6af04b7d50fe90eb6e04b71dd703300e3 --- /dev/null +++ b/python/examples/pipeline/imdb_model_ensemble/README.md @@ -0,0 +1,19 @@ +# IMDB model ensemble examples + +## Get models +``` +sh get_data.sh +``` + +## Start servers + +``` +python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn.log & +python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log & +python test_pipeline_server.py &>pipeline.log & +``` + +## Start clients +``` +python test_pipeline_client.py +``` diff --git a/python/examples/pipeline/simple_web_service/web_service.py b/python/examples/pipeline/simple_web_service/web_service.py index 84fb4824238a4f6b84e6861910729c98f99a15ee..4a90286fb477ef15d57b642e10ead7918f6bae68 100644 --- a/python/examples/pipeline/simple_web_service/web_service.py +++ b/python/examples/pipeline/simple_web_service/web_service.py @@ -17,6 +17,7 @@ except ImportError: from paddle_serving_server.web_service import WebService, Op import logging import numpy as np +import sys _LOGGER = logging.getLogger() @@ -31,11 +32,18 @@ class UciOp(Op): log_id, input_dict)) x_value = input_dict["x"] proc_dict = {} - if isinstance(x_value, (str, unicode)): - input_dict["x"] = np.array( - [float(x.strip()) - for x in x_value.split(self.separator)]).reshape(1, 13) - _LOGGER.error("input_dict:{}".format(input_dict)) + if sys.version_info.major == 2: + if isinstance(x_value, (str, unicode)): + input_dict["x"] = np.array( + [float(x.strip()) + for x in x_value.split(self.separator)]).reshape(1, 13) + _LOGGER.error("input_dict:{}".format(input_dict)) + else: + if isinstance(x_value, str): + input_dict["x"] = np.array( + [float(x.strip()) + for x in x_value.split(self.separator)]).reshape(1, 13) + _LOGGER.error("input_dict:{}".format(input_dict)) return input_dict, False, None, "" diff --git a/python/pipeline/analyse.py b/python/pipeline/analyse.py index 424b7e025394467840ae77a696e42cefc5a06eed..814b43acaf52bbf0c066ff4bbdce2a0165508a2d 100644 --- a/python/pipeline/analyse.py +++ b/python/pipeline/analyse.py @@ -312,7 +312,7 @@ class OpAnalyst(object): # reduce op times op_times = { - op_name: sum(step_times.values()) + op_name: sum(list(step_times.values())) for op_name, step_times in op_times.items() } diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index c6d16352b87e92e28b3a6775c512dd29dcedb294..9f06c445d1366ed9151b42f2f6532de8ac986758 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -429,9 +429,12 @@ class ProcessChannel(object): self._cv.wait() if self._stop.value == 1: raise ChannelStopError() - _LOGGER.debug( - self._log("(data_id={} log_id={}) Op({}) Got data".format( - resp.values()[0].id, resp.values()[0].log_id, op_name))) + + if resp is not None: + list_values = list(resp.values()) + _LOGGER.debug( + self._log("(data_id={} log_id={}) Op({}) Got data".format( + list_values[0].id, list_values[0].log_id, op_name))) return resp elif op_name is None: _LOGGER.critical( @@ -458,11 +461,12 @@ class ProcessChannel(object): try: channeldata = self._que.get(timeout=0) self._output_buf.append(channeldata) + list_values = list(channeldata.values()) _LOGGER.debug( self._log( "(data_id={} log_id={}) Op({}) Pop ready item into output_buffer". - format(channeldata.values()[0].id, - channeldata.values()[0].log_id, op_name))) + format(list_values[0].id, list_values[0].log_id, + op_name))) break except Queue.Empty: if timeout is not None: @@ -513,10 +517,12 @@ class ProcessChannel(object): self._cv.notify_all() - _LOGGER.debug( - self._log( - "(data_id={} log_id={}) Op({}) Got data from output_buffer". - format(resp.values()[0].id, resp.values()[0].log_id, op_name))) + if resp is not None: + list_values = list(resp.values()) + _LOGGER.debug( + self._log( + "(data_id={} log_id={}) Op({}) Got data from output_buffer". + format(list_values[0].id, list_values[0].log_id, op_name))) return resp def stop(self): @@ -726,9 +732,11 @@ class ThreadChannel(Queue.PriorityQueue): self._cv.wait() if self._stop: raise ChannelStopError() - _LOGGER.debug( - self._log("(data_id={} log_id={}) Op({}) Got data".format( - resp.values()[0].id, resp.values()[0].log_id, op_name))) + if resp is not None: + list_values = list(resp.values()) + _LOGGER.debug( + self._log("(data_id={} log_id={}) Op({}) Got data".format( + list_values[0].id, list_values[0].log_id, op_name))) return resp elif op_name is None: _LOGGER.critical( @@ -755,11 +763,12 @@ class ThreadChannel(Queue.PriorityQueue): try: channeldata = self.get(timeout=0) self._output_buf.append(channeldata) + list_values = list(channeldata.values()) _LOGGER.debug( self._log( "(data_id={} log_id={}) Op({}) Pop ready item into output_buffer". - format(channeldata.values()[0].id, - channeldata.values()[0].log_id, op_name))) + format(list_values[0].id, list_values[0].log_id, + op_name))) break except Queue.Empty: if timeout is not None: @@ -810,10 +819,12 @@ class ThreadChannel(Queue.PriorityQueue): self._cv.notify_all() - _LOGGER.debug( - self._log( - "(data_id={} log_id={}) Op({}) Got data from output_buffer". - format(resp.values()[0].id, resp.values()[0].log_id, op_name))) + if resp is not None: + list_values = list(resp.values()) + _LOGGER.debug( + self._log( + "(data_id={} log_id={}) Op({}) Got data from output_buffer". + format(list_values[0].id, list_values[0].log_id, op_name))) return resp def stop(self): diff --git a/python/pipeline/local_service_handler.py b/python/pipeline/local_service_handler.py index 4682cd65a6e90bb17747ecb76d119deabcefda73..a73627b69a37325b9895fa8a3217314d0371f539 100644 --- a/python/pipeline/local_service_handler.py +++ b/python/pipeline/local_service_handler.py @@ -125,13 +125,15 @@ class LocalServiceHandler(object): format(self._devices)) raise ValueError("The number of self._devices error") - if concurrency_idx <= 0: + if concurrency_idx < 0: _LOGGER.error("concurrency_idx({}) must be one positive number". format(concurrency_idx)) concurrency_idx = 0 elif concurrency_idx >= device_num: concurrency_idx = concurrency_idx % device_num + _LOGGER.info("GET_CLIENT : concurrency_idx={}, device_num={}".format( + concurrency_idx, device_num)) from paddle_serving_app.local_predict import LocalPredictor if self._local_predictor_client is None: self._local_predictor_client = LocalPredictor() diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 45763da995790964bfdc52311e183058d962fc64..4f488f6538f9faa2ae705378d5a0ae99538a6e5d 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -55,7 +55,7 @@ class Op(object): client_type=None, concurrency=None, timeout=None, - retry=None, + retry=0, batch_size=None, auto_batching_timeout=None, local_service_handler=None): @@ -679,7 +679,7 @@ class Op(object): err_channeldata_dict = collections.OrderedDict() ### if (batch_num == 1 && skip == True) ,then skip the process stage. is_skip_process = False - data_ids = preped_data_dict.keys() + data_ids = list(preped_data_dict.keys()) if len(data_ids) == 1 and skip_process_dict.get(data_ids[0]) == True: is_skip_process = True _LOGGER.info("(data_id={} log_id={}) skip process stage".format( diff --git a/python/pipeline/pipeline_client.py b/python/pipeline/pipeline_client.py index 265f88c444e2484e7e50705b507bf00bbe0db0e1..132cf043cd49f097c4ee47e36ce67f53f022b82a 100644 --- a/python/pipeline/pipeline_client.py +++ b/python/pipeline/pipeline_client.py @@ -53,10 +53,10 @@ class PipelineClient(object): if logid is None: req.logid = 0 else: - if six.PY2: + if sys.version_info.major == 2: req.logid = long(logid) - elif six.PY3: - req.logid = int(log_id) + elif sys.version_info.major == 3: + req.logid = int(logid) feed_dict.pop("logid") clientip = feed_dict.get("clientip") @@ -71,10 +71,15 @@ class PipelineClient(object): np.set_printoptions(threshold=sys.maxsize) for key, value in feed_dict.items(): req.key.append(key) + + if (sys.version_info.major == 2 and isinstance(value, + (str, unicode)) or + ((sys.version_info.major == 3) and isinstance(value, str))): + req.value.append(value) + continue + if isinstance(value, np.ndarray): req.value.append(value.__repr__()) - elif isinstance(value, (str, unicode)): - req.value.append(value) elif isinstance(value, list): req.value.append(np.array(value).__repr__()) else: