diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 3b3f58a168938d7c62a2d7e9e0246c925cd1a12f..d2323f265c7fac65bc97d9b8d9a3dea8afe4cf2e 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -60,6 +60,10 @@ class Op(object): self._outputs = [] self._profiler = None + # only for multithread + self._for_init_op_lock = threading.Lock() + self._succ_init_op = False + def init_profiler(self, profiler): self._profiler = profiler @@ -180,13 +184,6 @@ class Op(object): return proces def start_with_thread(self, client_type): - # load user resources - try: - self.init_op() - except Exception as e: - _LOGGER.error(e) - os._exit(-1) - threads = [] for concurrency_idx in range(self.concurrency): t = threading.Thread( @@ -330,8 +327,8 @@ class Op(object): client = None client_predict_handler = None + # create client based on client_type try: - # create client based on client_type client = self.init_client(client_type, self._client_config, self._server_endpoints, self._fetch_names) if client is not None: @@ -340,13 +337,18 @@ class Op(object): _LOGGER.error(log(e)) os._exit(-1) - if not use_multithread: - # load user resources - try: + # load user resources + try: + if use_multithread: + with self._for_init_op_lock: + if not self._succ_init_op: + self.init_op() + self._succ_init_op = True + else: self.init_op() - except Exception as e: - _LOGGER.error(log(e)) - os._exit(-1) + except Exception as e: + _LOGGER.error(log(e)) + os._exit(-1) self._is_run = True while self._is_run: