service_handler.py 27.6 KB
Newer Older
P
peng.xu 已提交
1 2 3
import logging
import time
import datetime
4
import json
P
peng.xu 已提交
5
from collections import defaultdict
B
BossZou 已提交
6
import ujson
P
peng.xu 已提交
7 8 9 10 11 12

import multiprocessing
from concurrent.futures import ThreadPoolExecutor
from milvus.grpc_gen import milvus_pb2, milvus_pb2_grpc, status_pb2
from milvus.grpc_gen.milvus_pb2 import TopKQueryResult
from milvus.client import types as Types
B
BossZou 已提交
13
from milvus import MetricType
P
peng.xu 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26 27

from mishards import (db, settings, exceptions)
from mishards.grpc_utils import mark_grpc_method
from mishards.grpc_utils.grpc_args_parser import GrpcArgsParser as Parser
from mishards import utilities

logger = logging.getLogger(__name__)


class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
    MAX_NPROBE = 2048
    MAX_TOPK = 2048

    def __init__(self, tracer, router, max_workers=multiprocessing.cpu_count(), **kwargs):
J
Jin Hai 已提交
28
        self.collection_meta = {}
P
peng.xu 已提交
29 30 31 32 33
        self.error_handlers = {}
        self.tracer = tracer
        self.router = router
        self.max_workers = max_workers

Y
yhz 已提交
34 35 36 37 38
    def _reduce(self, source_ids, ids, source_diss, diss, k, reverse):
        if source_diss[k - 1] <= diss[0]:
            return source_ids, source_diss
        if diss[k - 1] <= source_diss[0]:
            return ids, diss
Y
yhz 已提交
39 40 41

        source_diss.extend(diss)
        diss_t = enumerate(source_diss)
Y
yhz 已提交
42 43 44
        diss_m_rst = sorted(diss_t, key=lambda x: x[1])[:k]
        diss_m_out = [id_ for _, id_ in diss_m_rst]

Y
yhz 已提交
45 46
        source_ids.extend(ids)
        id_m_out = [source_ids[i] for i, _ in diss_m_rst]
Y
yhz 已提交
47 48 49

        return id_m_out, diss_m_out

P
peng.xu 已提交
50 51 52 53
    def _do_merge(self, files_n_topk_results, topk, reverse=False, **kwargs):
        status = status_pb2.Status(error_code=status_pb2.SUCCESS,
                                   reason="Success")
        if not files_n_topk_results:
P
peng.xu 已提交
54
            return status, [], []
P
peng.xu 已提交
55

Y
yhz 已提交
56 57
        merge_id_results = []
        merge_dis_results = []
P
peng.xu 已提交
58 59 60 61 62

        calc_time = time.time()
        for files_collection in files_n_topk_results:
            if isinstance(files_collection, tuple):
                status, _ = files_collection
P
peng.xu 已提交
63 64
                return status, [], []

65 66 67
            if files_collection.status.error_code != 0:
                return files_collection.status, [], []

Y
yhz 已提交
68
            row_num = files_collection.row_num
69 70 71 72
            # row_num is equal to 0, result is empty
            if not row_num:
                continue

Y
yhz 已提交
73 74
            ids = files_collection.ids
            diss = files_collection.distances  # distance collections
Y
yhz 已提交
75
            # TODO: batch_len is equal to topk, may need to compare with topk
Y
yhz 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
            batch_len = len(ids) // row_num

            for row_index in range(row_num):
                id_batch = ids[row_index * batch_len: (row_index + 1) * batch_len]
                dis_batch = diss[row_index * batch_len: (row_index + 1) * batch_len]

                if len(merge_id_results) < row_index:
                    raise ValueError("merge error")
                elif len(merge_id_results) == row_index:
                    # TODO: may bug here
                    merge_id_results.append(id_batch)
                    merge_dis_results.append(dis_batch)
                else:
                    merge_id_results[row_index], merge_dis_results[row_index] = \
                        self._reduce(merge_id_results[row_index], id_batch,
                                     merge_dis_results[row_index], dis_batch,
                                     batch_len,
                                     reverse)

P
peng.xu 已提交
95 96 97
        calc_time = time.time() - calc_time
        logger.info('Merge takes {}'.format(calc_time))

Y
yhz 已提交
98 99 100 101 102 103
        id_mrege_list = []
        dis_mrege_list = []

        for id_results, dis_results in zip(merge_id_results, merge_dis_results):
            id_mrege_list.extend(id_results)
            dis_mrege_list.extend(dis_results)
P
peng.xu 已提交
104

Y
yhz 已提交
105
        return status, id_mrege_list, dis_mrege_list
P
peng.xu 已提交
106 107 108

    def _do_query(self,
                  context,
J
Jin Hai 已提交
109 110
                  collection_id,
                  collection_meta,
P
peng.xu 已提交
111 112
                  vectors,
                  topk,
B
BossZou 已提交
113
                  search_params,
114
                  partition_tags=None,
P
peng.xu 已提交
115 116 117 118 119 120 121
                  **kwargs):
        metadata = kwargs.get('metadata', None)

        routing = {}
        p_span = None if self.tracer.empty else context.get_active_span(
        ).context
        with self.tracer.start_span('get_routing', child_of=p_span):
J
Jin Hai 已提交
122
            routing = self.router.routing(collection_id,
123
                                          partition_tags=partition_tags,
P
peng.xu 已提交
124 125 126 127 128 129 130 131
                                          metadata=metadata)
        logger.info('Routing: {}'.format(routing))

        metadata = kwargs.get('metadata', None)

        rs = []
        all_topk_results = []

J
Jin Hai 已提交
132
        def search(addr, collection_id, file_ids, vectors, topk, params, **kwargs):
P
peng.xu 已提交
133
            logger.info(
J
Jin Hai 已提交
134 135
                'Send Search Request: addr={};collection_id={};ids={};nq={};topk={};params={}'
                    .format(addr, collection_id, file_ids, len(vectors), topk, params))
P
peng.xu 已提交
136 137 138 139 140 141 142 143 144

            conn = self.router.query_conn(addr, metadata=metadata)
            start = time.time()
            span = kwargs.get('span', None)
            span = span if span else (None if self.tracer.empty else
                                      context.get_active_span().context)

            with self.tracer.start_span('search_{}'.format(addr),
                                        child_of=span):
J
Jin Hai 已提交
145
                ret = conn.conn.search_vectors_in_files(collection_name=collection_id,
B
BossZou 已提交
146 147 148 149 150
                                                        file_ids=file_ids,
                                                        query_records=vectors,
                                                        top_k=topk,
                                                        params=params)
                if ret.status.error_code != 0:
151
                    logger.error("Search fail {}".format(ret.status))
B
BossZou 已提交
152

P
peng.xu 已提交
153 154 155 156 157
                end = time.time()
                all_topk_results.append(ret)

        with self.tracer.start_span('do_search', child_of=p_span) as span:
            with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
158 159 160
                for addr, file_ids in routing.items():
                    res = pool.submit(search,
                                      addr,
J
Jin Hai 已提交
161
                                      collection_id,
162 163 164 165 166 167
                                      file_ids,
                                      vectors,
                                      topk,
                                      search_params,
                                      span=span)
                    rs.append(res)
P
peng.xu 已提交
168 169 170 171

                for res in rs:
                    res.result()

J
Jin Hai 已提交
172
        reverse = collection_meta.metric_type == Types.MetricType.IP
P
peng.xu 已提交
173 174 175 176 177 178
        with self.tracer.start_span('do_merge', child_of=p_span):
            return self._do_merge(all_topk_results,
                                  topk,
                                  reverse=reverse,
                                  metadata=metadata)

J
Jin Hai 已提交
179 180
    def _create_collection(self, collection_schema):
        return self.router.connection().create_collection(collection_schema)
P
peng.xu 已提交
181 182

    @mark_grpc_method
J
Jin Hai 已提交
183 184
    def CreateCollection(self, request, context):
        _status, unpacks = Parser.parse_proto_CollectionSchema(request)
P
peng.xu 已提交
185 186 187 188 189

        if not _status.OK():
            return status_pb2.Status(error_code=_status.code,
                                     reason=_status.message)

J
Jin Hai 已提交
190
        _status, _collection_schema = unpacks
J
Jin Hai 已提交
191
        # if _status.error_code != 0:
J
Jin Hai 已提交
192
        #     logging.warning('[CreateCollection] collection schema error occurred: {}'.format(_status))
B
BossZou 已提交
193 194
        #     return _status

J
Jin Hai 已提交
195
        logger.info('CreateCollection {}'.format(_collection_schema['collection_name']))
P
peng.xu 已提交
196

J
Jin Hai 已提交
197
        _status = self._create_collection(_collection_schema)
P
peng.xu 已提交
198 199 200 201

        return status_pb2.Status(error_code=_status.code,
                                 reason=_status.message)

J
Jin Hai 已提交
202 203
    def _has_collection(self, collection_name, metadata=None):
        return self.router.connection(metadata=metadata).has_collection(collection_name)
P
peng.xu 已提交
204 205

    @mark_grpc_method
J
Jin Hai 已提交
206 207
    def HasCollection(self, request, context):
        _status, _collection_name = Parser.parse_proto_CollectionName(request)
P
peng.xu 已提交
208 209 210 211 212 213

        if not _status.OK():
            return milvus_pb2.BoolReply(status=status_pb2.Status(
                error_code=_status.code, reason=_status.message),
                bool_reply=False)

J
Jin Hai 已提交
214
        logger.info('HasCollection {}'.format(_collection_name))
P
peng.xu 已提交
215

J
Jin Hai 已提交
216
        _status, _bool = self._has_collection(_collection_name,
P
peng.xu 已提交
217
                                         metadata={'resp_class': milvus_pb2.BoolReply})
P
peng.xu 已提交
218 219 220 221 222

        return milvus_pb2.BoolReply(status=status_pb2.Status(
            error_code=_status.code, reason=_status.message),
            bool_reply=_bool)

223 224
    @mark_grpc_method
    def CreatePartition(self, request, context):
J
Jin Hai 已提交
225 226
        _collection_name, _tag = Parser.parse_proto_PartitionParam(request)
        _status = self.router.connection().create_partition(_collection_name, _tag)
227 228 229 230 231
        return status_pb2.Status(error_code=_status.code,
                                 reason=_status.message)

    @mark_grpc_method
    def DropPartition(self, request, context):
J
Jin Hai 已提交
232
        _collection_name, _tag = Parser.parse_proto_PartitionParam(request)
233

J
Jin Hai 已提交
234
        _status = self.router.connection().drop_partition(_collection_name, _tag)
235 236 237 238 239
        return status_pb2.Status(error_code=_status.code,
                                 reason=_status.message)

    @mark_grpc_method
    def ShowPartitions(self, request, context):
J
Jin Hai 已提交
240
        _status, _collection_name = Parser.parse_proto_CollectionName(request)
241 242 243 244 245
        if not _status.OK():
            return milvus_pb2.PartitionList(status=status_pb2.Status(
                error_code=_status.code, reason=_status.message),
                partition_array=[])

J
Jin Hai 已提交
246
        logger.info('ShowPartitions {}'.format(_collection_name))
247

J
Jin Hai 已提交
248
        _status, partition_array = self.router.connection().show_partitions(_collection_name)
249 250 251

        return milvus_pb2.PartitionList(status=status_pb2.Status(
            error_code=_status.code, reason=_status.message),
B
BossZou 已提交
252
            partition_tag_array=[param.tag for param in partition_array])
253

J
Jin Hai 已提交
254 255
    def _drop_collection(self, collection_name):
        return self.router.connection().drop_collection(collection_name)
P
peng.xu 已提交
256 257

    @mark_grpc_method
J
Jin Hai 已提交
258 259
    def DropCollection(self, request, context):
        _status, _collection_name = Parser.parse_proto_CollectionName(request)
P
peng.xu 已提交
260 261 262 263 264

        if not _status.OK():
            return status_pb2.Status(error_code=_status.code,
                                     reason=_status.message)

J
Jin Hai 已提交
265
        logger.info('DropCollection {}'.format(_collection_name))
P
peng.xu 已提交
266

J
Jin Hai 已提交
267
        _status = self._drop_collection(_collection_name)
P
peng.xu 已提交
268 269 270 271

        return status_pb2.Status(error_code=_status.code,
                                 reason=_status.message)

J
Jin Hai 已提交
272 273
    def _create_index(self, collection_name, index_type, param):
        return self.router.connection().create_index(collection_name, index_type, param)
P
peng.xu 已提交
274 275 276 277 278 279 280 281 282

    @mark_grpc_method
    def CreateIndex(self, request, context):
        _status, unpacks = Parser.parse_proto_IndexParam(request)

        if not _status.OK():
            return status_pb2.Status(error_code=_status.code,
                                     reason=_status.message)

J
Jin Hai 已提交
283
        _collection_name, _index_type, _index_param = unpacks
P
peng.xu 已提交
284

J
Jin Hai 已提交
285
        logger.info('CreateIndex {}'.format(_collection_name))
P
peng.xu 已提交
286

J
Jin Hai 已提交
287 288
        # TODO: interface create_collection incompleted
        _status = self._create_index(_collection_name, _index_type, _index_param)
P
peng.xu 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309

        return status_pb2.Status(error_code=_status.code,
                                 reason=_status.message)

    def _add_vectors(self, param, metadata=None):
        return self.router.connection(metadata=metadata).add_vectors(
            None, None, insert_param=param)

    @mark_grpc_method
    def Insert(self, request, context):
        logger.info('Insert')
        # TODO: Ths SDK interface add_vectors() could update, add a key 'row_id_array'
        _status, _ids = self._add_vectors(
            metadata={'resp_class': milvus_pb2.VectorIds}, param=request)
        return milvus_pb2.VectorIds(status=status_pb2.Status(
            error_code=_status.code, reason=_status.message),
            vector_id_array=_ids)

    @mark_grpc_method
    def Search(self, request, context):

B
BossZou 已提交
310 311
        metadata = {'resp_class': milvus_pb2.TopKQueryResult}

J
Jin Hai 已提交
312
        collection_name = request.collection_name
P
peng.xu 已提交
313 314 315

        topk = request.topk

B
BossZou 已提交
316 317 318
        if len(request.extra_params) == 0:
            raise exceptions.SearchParamError(message="Search parma loss", metadata=metadata)
        params = ujson.loads(str(request.extra_params[0].value))
P
peng.xu 已提交
319

B
BossZou 已提交
320
        logger.info('Search {}: topk={} params={}'.format(
J
Jin Hai 已提交
321
            collection_name, topk, params))
P
peng.xu 已提交
322

B
BossZou 已提交
323 324 325
        # if nprobe > self.MAX_NPROBE or nprobe <= 0:
        #     raise exceptions.InvalidArgumentError(
        #         message='Invalid nprobe: {}'.format(nprobe), metadata=metadata)
P
peng.xu 已提交
326 327 328 329 330

        if topk > self.MAX_TOPK or topk <= 0:
            raise exceptions.InvalidTopKError(
                message='Invalid topk: {}'.format(topk), metadata=metadata)

J
Jin Hai 已提交
331
        collection_meta = self.collection_meta.get(collection_name, None)
P
peng.xu 已提交
332

J
Jin Hai 已提交
333
        if not collection_meta:
P
peng.xu 已提交
334
            status, info = self.router.connection(
J
Jin Hai 已提交
335
                metadata=metadata).describe_collection(collection_name)
P
peng.xu 已提交
336
            if not status.OK():
J
Jin Hai 已提交
337
                raise exceptions.CollectionNotFoundError(collection_name,
P
peng.xu 已提交
338 339
                                                    metadata=metadata)

J
Jin Hai 已提交
340 341
            self.collection_meta[collection_name] = info
            collection_meta = info
P
peng.xu 已提交
342 343 344 345

        start = time.time()

        query_record_array = []
J
Jin Hai 已提交
346
        if int(collection_meta.metric_type) >= MetricType.HAMMING.value:
B
BossZou 已提交
347 348 349 350 351
            for query_record in request.query_record_array:
                query_record_array.append(bytes(query_record.binary_data))
        else:
            for query_record in request.query_record_array:
                query_record_array.append(list(query_record.float_data))
P
peng.xu 已提交
352

Y
yhz 已提交
353
        status, id_results, dis_results = self._do_query(context,
J
Jin Hai 已提交
354 355
                                                         collection_name,
                                                         collection_meta,
Y
yhz 已提交
356 357
                                                         query_record_array,
                                                         topk,
B
BossZou 已提交
358
                                                         params,
359
                                                         partition_tags=getattr(request, "partition_tag_array", []),
Y
yhz 已提交
360
                                                         metadata=metadata)
P
peng.xu 已提交
361 362

        now = time.time()
B
BossZou 已提交
363
        logger.info('SearchVector takes: {}'.format(now - start))
P
peng.xu 已提交
364

Y
yhz 已提交
365
        topk_result_list = milvus_pb2.TopKQueryResult(
P
peng.xu 已提交
366 367
            status=status_pb2.Status(error_code=status.error_code,
                                     reason=status.reason),
368
            row_num=len(request.query_record_array) if len(id_results) else 0,
Y
yhz 已提交
369 370
            ids=id_results,
            distances=dis_results)
P
peng.xu 已提交
371 372 373 374 375 376
        return topk_result_list

    @mark_grpc_method
    def SearchInFiles(self, request, context):
        raise NotImplemented()

J
Jin Hai 已提交
377 378
    def _describe_collection(self, collection_name, metadata=None):
        return self.router.connection(metadata=metadata).describe_collection(collection_name)
P
peng.xu 已提交
379 380

    @mark_grpc_method
J
Jin Hai 已提交
381 382
    def DescribeCollection(self, request, context):
        _status, _collection_name = Parser.parse_proto_CollectionName(request)
P
peng.xu 已提交
383 384

        if not _status.OK():
J
Jin Hai 已提交
385
            return milvus_pb2.CollectionSchema(status=status_pb2.Status(
P
peng.xu 已提交
386 387
                error_code=_status.code, reason=_status.message), )

J
Jin Hai 已提交
388
        metadata = {'resp_class': milvus_pb2.CollectionSchema}
P
peng.xu 已提交
389

J
Jin Hai 已提交
390 391 392
        logger.info('DescribeCollection {}'.format(_collection_name))
        _status, _collection = self._describe_collection(metadata=metadata,
                                               collection_name=_collection_name)
P
peng.xu 已提交
393 394

        if _status.OK():
J
Jin Hai 已提交
395 396 397 398 399
            return milvus_pb2.CollectionSchema(
                collection_name=_collection_name,
                index_file_size=_collection.index_file_size,
                dimension=_collection.dimension,
                metric_type=_collection.metric_type,
P
peng.xu 已提交
400 401 402 403
                status=status_pb2.Status(error_code=_status.code,
                                         reason=_status.message),
            )

J
Jin Hai 已提交
404 405
        return milvus_pb2.CollectionSchema(
            collection_name=_collection_name,
P
peng.xu 已提交
406 407 408 409
            status=status_pb2.Status(error_code=_status.code,
                                     reason=_status.message),
        )

J
Jin Hai 已提交
410 411
    def _collection_info(self, collection_name, metadata=None):
        return self.router.connection(metadata=metadata).collection_info(collection_name)
B
BossZou 已提交
412 413

    @mark_grpc_method
J
Jin Hai 已提交
414 415
    def ShowCollectionInfo(self, request, context):
        _status, _collection_name = Parser.parse_proto_CollectionName(request)
B
BossZou 已提交
416 417

        if not _status.OK():
J
Jin Hai 已提交
418
            return milvus_pb2.CollectionInfo(status=status_pb2.Status(
B
BossZou 已提交
419 420
                error_code=_status.code, reason=_status.message), )

J
Jin Hai 已提交
421
        metadata = {'resp_class': milvus_pb2.CollectionInfo}
B
BossZou 已提交
422

J
Jin Hai 已提交
423 424
        logger.info('ShowCollectionInfo {}'.format(_collection_name))
        _status, _info = self._collection_info(metadata=metadata, collection_name=_collection_name)
B
BossZou 已提交
425 426

        if _status.OK():
J
Jin Hai 已提交
427
            _collection_info = milvus_pb2.CollectionInfo(
B
BossZou 已提交
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445
                status=status_pb2.Status(error_code=_status.code,
                                         reason=_status.message),
                total_row_count=_info.count
            )

            for par_stat in _info.partitions_stat:
                _par = milvus_pb2.PartitionStat(
                    tag=par_stat.tag,
                    total_row_count=par_stat.count
                )
                for seg_stat in par_stat.segments_stat:
                    _par.segments_stat.add(
                        segment_name=seg_stat.segment_name,
                        row_count=seg_stat.count,
                        index_name=seg_stat.index_name,
                        data_size=seg_stat.data_size,
                    )

J
Jin Hai 已提交
446 447
                _collection_info.partitions_stat.append(_par)
            return _collection_info
B
BossZou 已提交
448

J
Jin Hai 已提交
449
        return milvus_pb2.CollectionInfo(
B
BossZou 已提交
450 451 452 453
            status=status_pb2.Status(error_code=_status.code,
                                     reason=_status.message),
        )

J
Jin Hai 已提交
454
    def _count_collection(self, collection_name, metadata=None):
P
peng.xu 已提交
455
        return self.router.connection(
J
Jin Hai 已提交
456
            metadata=metadata).count_collection(collection_name)
P
peng.xu 已提交
457 458

    @mark_grpc_method
J
Jin Hai 已提交
459 460
    def CountCollection(self, request, context):
        _status, _collection_name = Parser.parse_proto_CollectionName(request)
P
peng.xu 已提交
461 462 463 464 465

        if not _status.OK():
            status = status_pb2.Status(error_code=_status.code,
                                       reason=_status.message)

J
Jin Hai 已提交
466
            return milvus_pb2.CollectionRowCount(status=status)
P
peng.xu 已提交
467

J
Jin Hai 已提交
468
        logger.info('CountCollection {}'.format(_collection_name))
P
peng.xu 已提交
469

J
Jin Hai 已提交
470 471
        metadata = {'resp_class': milvus_pb2.CollectionRowCount}
        _status, _count = self._count_collection(_collection_name, metadata=metadata)
P
peng.xu 已提交
472

J
Jin Hai 已提交
473
        return milvus_pb2.CollectionRowCount(
P
peng.xu 已提交
474 475
            status=status_pb2.Status(error_code=_status.code,
                                     reason=_status.message),
J
Jin Hai 已提交
476
            collection_row_count=_count if isinstance(_count, int) else -1)
P
peng.xu 已提交
477 478 479 480

    def _get_server_version(self, metadata=None):
        return self.router.connection(metadata=metadata).server_version()

B
BossZou 已提交
481 482 483
    def _cmd(self, cmd, metadata=None):
        return self.router.connection(metadata=metadata)._cmd(cmd)

P
peng.xu 已提交
484 485 486 487 488 489 490 491 492 493 494
    @mark_grpc_method
    def Cmd(self, request, context):
        _status, _cmd = Parser.parse_proto_Command(request)
        logger.info('Cmd: {}'.format(_cmd))

        if not _status.OK():
            return milvus_pb2.StringReply(status=status_pb2.Status(
                error_code=_status.code, reason=_status.message))

        metadata = {'resp_class': milvus_pb2.StringReply}

495 496 497 498 499 500
        if _cmd == 'conn_stats':
            stats = self.router.readonly_topo.stats()
            return milvus_pb2.StringReply(status=status_pb2.Status(
                error_code=status_pb2.SUCCESS),
                string_reply=json.dumps(stats, indent=2))

B
BossZou 已提交
501 502 503 504 505 506
        # if _cmd == 'version':
        #     _status, _reply = self._get_server_version(metadata=metadata)
        # else:
        #     _status, _reply = self.router.connection(
        #         metadata=metadata).server_status()
        _status, _reply = self._cmd(_cmd, metadata=metadata)
P
peng.xu 已提交
507 508 509 510 511

        return milvus_pb2.StringReply(status=status_pb2.Status(
            error_code=_status.code, reason=_status.message),
            string_reply=_reply)

J
Jin Hai 已提交
512
    def _show_collections(self, metadata=None):
B
BossZou 已提交
513
        return self.router.connection(metadata=metadata).show_collections()
P
peng.xu 已提交
514 515

    @mark_grpc_method
J
Jin Hai 已提交
516 517 518 519
    def ShowCollections(self, request, context):
        logger.info('ShowCollections')
        metadata = {'resp_class': milvus_pb2.CollectionName}
        _status, _results = self._show_collections(metadata=metadata)
P
peng.xu 已提交
520

J
Jin Hai 已提交
521
        return milvus_pb2.CollectionNameList(status=status_pb2.Status(
P
peng.xu 已提交
522
            error_code=_status.code, reason=_status.message),
J
Jin Hai 已提交
523
            collection_names=_results)
P
peng.xu 已提交
524

J
Jin Hai 已提交
525 526
    def _preload_collection(self, collection_name):
        return self.router.connection().preload_collection(collection_name)
P
peng.xu 已提交
527 528

    @mark_grpc_method
J
Jin Hai 已提交
529 530
    def PreloadCollection(self, request, context):
        _status, _collection_name = Parser.parse_proto_CollectionName(request)
P
peng.xu 已提交
531 532 533 534 535

        if not _status.OK():
            return status_pb2.Status(error_code=_status.code,
                                     reason=_status.message)

J
Jin Hai 已提交
536 537
        logger.info('PreloadCollection {}'.format(_collection_name))
        _status = self._preload_collection(_collection_name)
P
peng.xu 已提交
538 539 540
        return status_pb2.Status(error_code=_status.code,
                                 reason=_status.message)

J
Jin Hai 已提交
541 542
    def _describe_index(self, collection_name, metadata=None):
        return self.router.connection(metadata=metadata).describe_index(collection_name)
P
peng.xu 已提交
543 544 545

    @mark_grpc_method
    def DescribeIndex(self, request, context):
J
Jin Hai 已提交
546
        _status, _collection_name = Parser.parse_proto_CollectionName(request)
P
peng.xu 已提交
547 548 549 550 551 552 553

        if not _status.OK():
            return milvus_pb2.IndexParam(status=status_pb2.Status(
                error_code=_status.code, reason=_status.message))

        metadata = {'resp_class': milvus_pb2.IndexParam}

J
Jin Hai 已提交
554 555
        logger.info('DescribeIndex {}'.format(_collection_name))
        _status, _index_param = self._describe_index(collection_name=_collection_name,
P
peng.xu 已提交
556 557 558 559 560 561
                                                     metadata=metadata)

        if not _index_param:
            return milvus_pb2.IndexParam(status=status_pb2.Status(
                error_code=_status.code, reason=_status.message))

B
BossZou 已提交
562
        _index_type = _index_param._index_type
P
peng.xu 已提交
563

B
BossZou 已提交
564
        grpc_index = milvus_pb2.IndexParam(status=status_pb2.Status(
P
peng.xu 已提交
565
            error_code=_status.code, reason=_status.message),
J
Jin Hai 已提交
566
            collection_name=_collection_name, index_type=_index_type)
B
BossZou 已提交
567 568 569 570

        grpc_index.extra_params.add(key='params', value=ujson.dumps(_index_param._params))
        return grpc_index

J
Jin Hai 已提交
571 572
    def _get_vector_by_id(self, collection_name, vec_id, metadata):
        return self.router.connection(metadata=metadata).get_vector_by_id(collection_name, vec_id)
B
BossZou 已提交
573 574 575 576 577 578 579 580 581 582

    @mark_grpc_method
    def GetVectorByID(self, request, context):
        _status, unpacks = Parser.parse_proto_VectorIdentity(request)
        if not _status.OK():
            return status_pb2.Status(error_code=_status.code,
                                     reason=_status.message)

        metadata = {'resp_class': milvus_pb2.VectorData}

J
Jin Hai 已提交
583 584 585
        _collection_name, _id = unpacks
        logger.info('GetVectorByID {}'.format(_collection_name))
        _status, vector = self._get_vector_by_id(_collection_name, _id, metadata)
B
BossZou 已提交
586 587 588 589 590 591 592 593 594 595 596 597 598 599 600

        if not vector:
            return milvus_pb2.VectorData(status=status_pb2.Status(
                error_code=_status.code, reason=_status.message), )

        if isinstance(vector, bytes):
            records = milvus_pb2.RowRecord(binary_data=vector)
        else:
            records = milvus_pb2.RowRecord(float_data=vector)

        return milvus_pb2.VectorData(status=status_pb2.Status(
            error_code=_status.code, reason=_status.message),
            vector_data=records
        )

J
Jin Hai 已提交
601 602
    def _get_vector_ids(self, collection_name, segment_name, metadata):
        return self.router.connection(metadata=metadata).get_vector_ids(collection_name, segment_name)
B
BossZou 已提交
603 604 605 606 607 608 609 610 611 612 613

    @mark_grpc_method
    def GetVectorIDs(self, request, context):
        _status, unpacks = Parser.parse_proto_GetVectorIDsParam(request)

        if not _status.OK():
            return status_pb2.Status(error_code=_status.code,
                                     reason=_status.message)

        metadata = {'resp_class': milvus_pb2.VectorIds}

J
Jin Hai 已提交
614 615 616
        _collection_name, _segment_name = unpacks
        logger.info('GetVectorIDs {}'.format(_collection_name))
        _status, ids = self._get_vector_ids(_collection_name, _segment_name, metadata)
B
BossZou 已提交
617 618 619 620 621 622 623 624 625 626

        if not ids:
            return milvus_pb2.VectorIds(status=status_pb2.Status(
                error_code=_status.code, reason=_status.message), )

        return milvus_pb2.VectorIds(status=status_pb2.Status(
            error_code=_status.code, reason=_status.message),
            vector_id_array=ids
        )

J
Jin Hai 已提交
627 628
    def _delete_by_id(self, collection_name, id_array):
        return self.router.connection().delete_by_id(collection_name, id_array)
B
BossZou 已提交
629 630 631 632 633 634 635 636 637 638

    @mark_grpc_method
    def DeleteByID(self, request, context):
        _status, unpacks = Parser.parse_proto_DeleteByIDParam(request)

        if not _status.OK():
            logging.error('DeleteByID {}'.format(_status.message))
            return status_pb2.Status(error_code=_status.code,
                                     reason=_status.message)

J
Jin Hai 已提交
639 640 641
        _collection_name, _ids = unpacks
        logger.info('DeleteByID {}'.format(_collection_name))
        _status = self._delete_by_id(_collection_name, _ids)
B
BossZou 已提交
642 643 644

        return status_pb2.Status(error_code=_status.code,
                                 reason=_status.message)
P
peng.xu 已提交
645

J
Jin Hai 已提交
646 647
    def _drop_index(self, collection_name):
        return self.router.connection().drop_index(collection_name)
P
peng.xu 已提交
648 649 650

    @mark_grpc_method
    def DropIndex(self, request, context):
J
Jin Hai 已提交
651
        _status, _collection_name = Parser.parse_proto_CollectionName(request)
P
peng.xu 已提交
652 653 654 655 656

        if not _status.OK():
            return status_pb2.Status(error_code=_status.code,
                                     reason=_status.message)

J
Jin Hai 已提交
657 658
        logger.info('DropIndex {}'.format(_collection_name))
        _status = self._drop_index(_collection_name)
P
peng.xu 已提交
659 660
        return status_pb2.Status(error_code=_status.code,
                                 reason=_status.message)
B
BossZou 已提交
661

J
Jin Hai 已提交
662 663
    def _flush(self, collection_names):
        return self.router.connection().flush(collection_names)
B
BossZou 已提交
664 665 666

    @mark_grpc_method
    def Flush(self, request, context):
J
Jin Hai 已提交
667
        _status, _collection_names = Parser.parse_proto_FlushParam(request)
B
BossZou 已提交
668 669 670 671 672

        if not _status.OK():
            return status_pb2.Status(error_code=_status.code,
                                     reason=_status.message)

J
Jin Hai 已提交
673 674
        logger.info('Flush {}'.format(_collection_names))
        _status = self._flush(_collection_names)
B
BossZou 已提交
675 676 677
        return status_pb2.Status(error_code=_status.code,
                                 reason=_status.message)

J
Jin Hai 已提交
678 679
    def _compact(self, collection_name):
        return self.router.connection().compact(collection_name)
B
BossZou 已提交
680 681 682

    @mark_grpc_method
    def Compact(self, request, context):
J
Jin Hai 已提交
683
        _status, _collection_name = Parser.parse_proto_CollectionName(request)
B
BossZou 已提交
684 685 686 687 688

        if not _status.OK():
            return status_pb2.Status(error_code=_status.code,
                                     reason=_status.message)

J
Jin Hai 已提交
689 690
        logger.info('Compact {}'.format(_collection_name))
        _status = self._compact(_collection_name)
B
BossZou 已提交
691 692
        return status_pb2.Status(error_code=_status.code,
                                 reason=_status.message)