提交 fb5e6ab3 编写于 作者: P peng.xu

refactor max workers in handler

上级 a3409be0
......@@ -3,6 +3,7 @@ import time
import datetime
from collections import defaultdict
import multiprocessing
from concurrent.futures import ThreadPoolExecutor
from milvus.grpc_gen import milvus_pb2, milvus_pb2_grpc, status_pb2
from milvus.grpc_gen.milvus_pb2 import TopKQueryResult
......@@ -20,12 +21,13 @@ logger = logging.getLogger(__name__)
class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
MAX_NPROBE = 2048
def __init__(self, conn_mgr, tracer, router, *args, **kwargs):
def __init__(self, conn_mgr, tracer, router, max_workers=multiprocessing.cpu_count(), **kwargs):
self.conn_mgr = conn_mgr
self.table_meta = {}
self.error_handlers = {}
self.tracer = tracer
self.router = router
self.max_workers = max_workers
def connection(self, metadata=None):
conn = self.conn_mgr.conn('WOSERVER', metadata=metadata)
......@@ -102,8 +104,6 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
rs = []
all_topk_results = []
workers = settings.SEARCH_WORKER_SIZE
def search(addr, query_params, vectors, topk, nprobe, **kwargs):
logger.info(
'Send Search Request: addr={};params={};nq={};topk={};nprobe={}'
......@@ -130,7 +130,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
all_topk_results.append(ret)
with self.tracer.start_span('do_search', child_of=p_span) as span:
with ThreadPoolExecutor(max_workers=workers) as pool:
with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
for addr, params in routing.items():
res = pool.submit(search,
addr,
......
......@@ -23,7 +23,6 @@ config(LOG_LEVEL, LOG_PATH, LOG_NAME, TIMEZONE)
TIMEOUT = env.int('TIMEOUT', 60)
MAX_RETRY = env.int('MAX_RETRY', 3)
SEARCH_WORKER_SIZE = env.int('SEARCH_WORKER_SIZE', 10)
SERVER_PORT = env.int('SERVER_PORT', 19530)
WOSERVER = env.str('WOSERVER')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册