未验证 提交 c60babeb 编写于 作者: B BossZou 提交者: GitHub

Upgrade mishards to 0.8.0 (#1933)

* update grpc server of milvus & rename table name to collection
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* update changlog
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* [skip ci] Skip CI
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* [skip ci] Update changlog
Signed-off-by: NYhz <yinghao.zou@zilliz.com>
上级 8d5ab34a
......@@ -32,6 +32,7 @@ Please mark all change in change log and use the issue from GitHub
- \#1885 Optimize knowhere unittest
- \#1886 Refactor log on search and insert request
- \#1897 Heap pop and push can be realized by heap_swap_top
- \#1930 Upgrade mishards to 0.8.0
## Task
......
......@@ -4,7 +4,7 @@ CONNECT_ERROR_CODE = 10001
CONNECTTION_NOT_FOUND_CODE = 10002
DB_ERROR_CODE = 10003
TABLE_NOT_FOUND_CODE = 20001
COLLECTION_NOT_FOUND_CODE = 20001
INVALID_ARGUMENT_CODE = 20002
INVALID_DATE_RANGE_CODE = 20003
INVALID_TOPK_CODE = 20004
......
......@@ -30,23 +30,23 @@ def resp_handler(err, error_code):
ids=[],
distances=[])
if resp_class == milvus_pb2.TableRowCount:
return resp_class(status=status, table_row_count=-1)
if resp_class == milvus_pb2.CollectionRowCount:
return resp_class(status=status, collection_row_count=-1)
if resp_class == milvus_pb2.TableName:
return resp_class(status=status, table_name=[])
if resp_class == milvus_pb2.CollectionName:
return resp_class(status=status, collection_name=[])
if resp_class == milvus_pb2.StringReply:
return resp_class(status=status, string_reply='')
if resp_class == milvus_pb2.TableSchema:
return milvus_pb2.TableSchema(
if resp_class == milvus_pb2.CollectionSchema:
return milvus_pb2.CollectionSchema(
status=status
)
if resp_class == milvus_pb2.IndexParam:
return milvus_pb2.IndexParam(
table_name=milvus_pb2.TableName(
collection_name=milvus_pb2.CollectionName(
status=status
)
)
......@@ -55,10 +55,10 @@ def resp_handler(err, error_code):
return status
@server.errorhandler(exceptions.TableNotFoundError)
def TableNotFoundErrorHandler(err):
@server.errorhandler(exceptions.CollectionNotFoundError)
def CollectionNotFoundErrorHandler(err):
logger.error(err)
return resp_handler(err, status_pb2.TABLE_NOT_EXISTS)
return resp_handler(err, status_pb2.COLLECTION_NOT_EXISTS)
@server.errorhandler(exceptions.InvalidTopKError)
......
......@@ -22,8 +22,8 @@ class DBError(BaseException):
code = codes.DB_ERROR_CODE
class TableNotFoundError(BaseException):
code = codes.TABLE_NOT_FOUND_CODE
class CollectionNotFoundError(BaseException):
code = codes.COLLECTION_NOT_FOUND_CODE
class InvalidTopKError(BaseException):
......
......@@ -20,25 +20,25 @@ class GrpcArgsParser(object):
@classmethod
@error_status
def parse_proto_TableSchema(cls, param):
_table_schema = {
'collection_name': param.table_name,
def parse_proto_CollectionSchema(cls, param):
_collection_schema = {
'collection_name': param.collection_name,
'dimension': param.dimension,
'index_file_size': param.index_file_size,
'metric_type': param.metric_type
}
return param.status, _table_schema
return param.status, _collection_schema
@classmethod
@error_status
def parse_proto_TableName(cls, param):
return param.table_name
def parse_proto_CollectionName(cls, param):
return param.collection_name
@classmethod
@error_status
def parse_proto_FlushParam(cls, param):
return list(param.table_name_array)
return list(param.collection_name_array)
@classmethod
@error_status
......@@ -53,7 +53,7 @@ class GrpcArgsParser(object):
@classmethod
@error_status
def parse_proto_IndexParam(cls, param):
_table_name = param.table_name
_collection_name = param.collection_name
_index_type = param.index_type
_index_param = {}
......@@ -61,7 +61,7 @@ class GrpcArgsParser(object):
if params.key == 'params':
_index_param = ujson.loads(str(params.value))
return _table_name, _index_type, _index_param
return _collection_name, _index_type, _index_param
@classmethod
@error_status
......@@ -77,15 +77,15 @@ class GrpcArgsParser(object):
@classmethod
def parse_proto_PartitionParam(cls, param):
_table_name = param.table_name
_collection_name = param.collection_name
_tag = param.tag
return _table_name, _tag
return _collection_name, _tag
@classmethod
@error_status
def parse_proto_SearchParam(cls, param):
_table_name = param.table_name
_collection_name = param.collection_name
_topk = param.topk
if len(param.extra_params) == 0:
......@@ -102,28 +102,28 @@ class GrpcArgsParser(object):
else:
raise Exception("Search argument parse error: record array is empty")
return _table_name, _query_record_array, _topk, _params
return _collection_name, _query_record_array, _topk, _params
@classmethod
@error_status
def parse_proto_DeleteByIDParam(cls, param):
_table_name = param.table_name
_collection_name = param.collection_name
_id_array = list(param.id_array)
return _table_name, _id_array
return _collection_name, _id_array
@classmethod
@error_status
def parse_proto_VectorIdentity(cls, param):
_table_name = param.table_name
_collection_name = param.collection_name
_id = param.id
return _table_name, _id
return _collection_name, _id
@classmethod
@error_status
def parse_proto_GetVectorIDsParam(cls, param):
_table__name = param.table_name
_collection__name = param.collection_name
_segment_name = param.segment_name
return _table__name, _segment_name
return _collection__name, _segment_name
# class GrpcArgsWrapper(object):
# @classmethod
# def proto_TableName(cls):
# def proto_CollectionName(cls):
......@@ -6,7 +6,7 @@ class RouterMixin:
self.writable_topo = writable_topo
self.readonly_topo = readonly_topo
def routing(self, table_name, metadata=None, **kwargs):
def routing(self, collection_name, metadata=None, **kwargs):
raise NotImplemented()
def connection(self, metadata=None):
......
......@@ -16,53 +16,53 @@ class Factory(RouterMixin):
super(Factory, self).__init__(writable_topo=writable_topo,
readonly_topo=readonly_topo)
def routing(self, table_name, partition_tags=None, metadata=None, **kwargs):
def routing(self, collection_name, partition_tags=None, metadata=None, **kwargs):
range_array = kwargs.pop('range_array', None)
return self._route(table_name, range_array, partition_tags, metadata, **kwargs)
return self._route(collection_name, range_array, partition_tags, metadata, **kwargs)
def _route(self, table_name, range_array, partition_tags=None, metadata=None, **kwargs):
def _route(self, collection_name, range_array, partition_tags=None, metadata=None, **kwargs):
# PXU TODO: Implement Thread-local Context
# PXU TODO: Session life mgt
if not partition_tags:
cond = and_(
or_(Tables.table_id == table_name, Tables.owner_table == table_name),
or_(Tables.table_id == collection_name, Tables.owner_table == collection_name),
Tables.state != Tables.TO_DELETE)
else:
# TODO: collection default partition is '_default'
cond = and_(Tables.state != Tables.TO_DELETE,
Tables.owner_table == table_name,
Tables.owner_table == collection_name,
Tables.partition_tag.in_(partition_tags))
if '_default' in partition_tags:
default_par_cond = and_(Tables.table_id == table_name, Tables.state != Tables.TO_DELETE)
default_par_cond = and_(Tables.table_id == collection_name, Tables.state != Tables.TO_DELETE)
cond = or_(cond, default_par_cond)
try:
tables = db.Session.query(Tables).filter(cond).all()
collections = db.Session.query(Tables).filter(cond).all()
except sqlalchemy_exc.SQLAlchemyError as e:
raise exceptions.DBError(message=str(e), metadata=metadata)
if not tables:
logger.error("Cannot find table {} / {} in metadata".format(table_name, partition_tags))
raise exceptions.TableNotFoundError('{}:{}'.format(table_name, partition_tags), metadata=metadata)
if not collections:
logger.error("Cannot find collection {} / {} in metadata".format(collection_name, partition_tags))
raise exceptions.CollectionNotFoundError('{}:{}'.format(collection_name, partition_tags), metadata=metadata)
table_list = [str(table.table_id) for table in tables]
collection_list = [str(collection.table_id) for collection in collections]
file_type_cond = or_(
TableFiles.file_type == TableFiles.FILE_TYPE_RAW,
TableFiles.file_type == TableFiles.FILE_TYPE_TO_INDEX,
TableFiles.file_type == TableFiles.FILE_TYPE_INDEX,
)
file_cond = and_(file_type_cond, TableFiles.table_id.in_(table_list))
file_cond = and_(file_type_cond, TableFiles.table_id.in_(collection_list))
try:
files = db.Session.query(TableFiles).filter(file_cond).all()
except sqlalchemy_exc.SQLAlchemyError as e:
raise exceptions.DBError(message=str(e), metadata=metadata)
if not files:
logger.warning("Table file is empty. {}".format(table_list))
# logger.error("Cannot find table file id {} / {} in metadata".format(table_name, partition_tags))
# raise exceptions.TableNotFoundError('Table file id not found. {}:{}'.format(table_name, partition_tags),
# metadata=metadata)
logger.warning("Collection file is empty. {}".format(collection_list))
# logger.error("Cannot find collection file id {} / {} in metadata".format(collection_name, partition_tags))
# raise exceptions.CollectionNotFoundError('Collection file id not found. {}:{}'.format(collection_name, partition_tags),
# metadata=metadata)
db.remove_session()
......
此差异已折叠。
......@@ -12,7 +12,7 @@ else:
env.read_env()
SERVER_VERSIONS = ['0.7.0', '0.7.1']
SERVER_VERSIONS = ['0.8.0']
DEBUG = env.bool('DEBUG', False)
MAX_RETRY = env.int('MAX_RETRY', 3)
......
......@@ -7,7 +7,7 @@ import faker
import inspect
from milvus import Milvus
from milvus.client.types import Status, IndexType, MetricType
from milvus.client.abstract import IndexParam, TableSchema
from milvus.client.abstract import IndexParam, CollectionSchema
from milvus.grpc_gen import status_pb2, milvus_pb2
from mishards import db, create_app, settings
from mishards.service_handler import ServiceHandler
......@@ -43,55 +43,55 @@ class TestServer:
assert not status.OK()
def test_drop_index(self, started_app):
table_name = inspect.currentframe().f_code.co_name
collection_name = inspect.currentframe().f_code.co_name
ServiceHandler._drop_index = mock.MagicMock(return_value=OK)
status = self.client.drop_index(table_name)
status = self.client.drop_index(collection_name)
assert status.OK()
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(BAD, table_name))
status = self.client.drop_index(table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(BAD, collection_name))
status = self.client.drop_index(collection_name)
assert not status.OK()
def test_describe_index(self, started_app):
table_name = inspect.currentframe().f_code.co_name
collection_name = inspect.currentframe().f_code.co_name
index_type = IndexType.FLAT
nlist = 1
index_param = IndexParam(table_name=table_name,
params = {'nlist': 1}
index_param = IndexParam(collection_name=collection_name,
index_type=index_type,
nlist=nlist)
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(OK, table_name))
params=params)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(OK, collection_name))
ServiceHandler._describe_index = mock.MagicMock(
return_value=(OK, index_param))
status, ret = self.client.describe_index(table_name)
status, ret = self.client.describe_index(collection_name)
assert status.OK()
assert ret._table_name == index_param._table_name
assert ret._collection_name == index_param._collection_name
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(BAD, table_name))
status, _ = self.client.describe_index(table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(BAD, collection_name))
status, _ = self.client.describe_index(collection_name)
assert not status.OK()
def test_preload(self, started_app):
table_name = inspect.currentframe().f_code.co_name
collection_name = inspect.currentframe().f_code.co_name
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(OK, table_name))
ServiceHandler._preload_table = mock.MagicMock(return_value=OK)
status = self.client.preload_table(table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(OK, collection_name))
ServiceHandler._preload_collection = mock.MagicMock(return_value=OK)
status = self.client.preload_collection(collection_name)
assert status.OK()
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(BAD, table_name))
status = self.client.preload_table(table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(BAD, collection_name))
status = self.client.preload_collection(collection_name)
assert not status.OK()
@pytest.mark.skip
def test_delete_by_range(self, started_app):
table_name = inspect.currentframe().f_code.co_name
collection_name = inspect.currentframe().f_code.co_name
unpacked = table_name, datetime.datetime.today(
unpacked = collection_name, datetime.datetime.today(
), datetime.datetime.today()
Parser.parse_proto_DeleteByRangeParam = mock.MagicMock(
......@@ -107,122 +107,122 @@ class TestServer:
*unpacked)
assert not status.OK()
def test_count_table(self, started_app):
table_name = inspect.currentframe().f_code.co_name
def test_count_collection(self, started_app):
collection_name = inspect.currentframe().f_code.co_name
count = random.randint(100, 200)
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(OK, table_name))
ServiceHandler._count_table = mock.MagicMock(return_value=(OK, count))
status, ret = self.client.get_table_row_count(table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(OK, collection_name))
ServiceHandler._count_collection = mock.MagicMock(return_value=(OK, count))
status, ret = self.client.count_collection(collection_name)
assert status.OK()
assert ret == count
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(BAD, table_name))
status, _ = self.client.get_table_row_count(table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(BAD, collection_name))
status, _ = self.client.count_collection(collection_name)
assert not status.OK()
def test_show_tables(self, started_app):
tables = ['t1', 't2']
ServiceHandler._show_tables = mock.MagicMock(return_value=(OK, tables))
status, ret = self.client.show_tables()
def test_show_collections(self, started_app):
collections = ['t1', 't2']
ServiceHandler._show_collections = mock.MagicMock(return_value=(OK, collections))
status, ret = self.client.show_collections()
assert status.OK()
assert ret == tables
assert ret == collections
def test_describe_table(self, started_app):
table_name = inspect.currentframe().f_code.co_name
def test_describe_collection(self, started_app):
collection_name = inspect.currentframe().f_code.co_name
dimension = 128
nlist = 1
table_schema = TableSchema(table_name=table_name,
collection_schema = CollectionSchema(collection_name=collection_name,
index_file_size=100,
metric_type=MetricType.L2,
dimension=dimension)
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(OK, table_schema.table_name))
ServiceHandler._describe_table = mock.MagicMock(
return_value=(OK, table_schema))
status, _ = self.client.describe_table(table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(OK, collection_schema.collection_name))
ServiceHandler._describe_collection = mock.MagicMock(
return_value=(OK, collection_schema))
status, _ = self.client.describe_collection(collection_name)
assert status.OK()
ServiceHandler._describe_table = mock.MagicMock(
return_value=(BAD, table_schema))
status, _ = self.client.describe_table(table_name)
ServiceHandler._describe_collection = mock.MagicMock(
return_value=(BAD, collection_schema))
status, _ = self.client.describe_collection(collection_name)
assert not status.OK()
Parser.parse_proto_TableName = mock.MagicMock(return_value=(BAD,
Parser.parse_proto_CollectionName = mock.MagicMock(return_value=(BAD,
'cmd'))
status, ret = self.client.describe_table(table_name)
status, ret = self.client.describe_collection(collection_name)
assert not status.OK()
def test_insert(self, started_app):
table_name = inspect.currentframe().f_code.co_name
collection_name = inspect.currentframe().f_code.co_name
vectors = [[random.random() for _ in range(16)] for _ in range(10)]
ids = [random.randint(1000000, 20000000) for _ in range(10)]
ServiceHandler._add_vectors = mock.MagicMock(return_value=(OK, ids))
status, ret = self.client.add_vectors(
table_name=table_name, records=vectors)
collection_name=collection_name, records=vectors)
assert status.OK()
assert ids == ret
def test_create_index(self, started_app):
table_name = inspect.currentframe().f_code.co_name
unpacks = table_name, None
collection_name = inspect.currentframe().f_code.co_name
unpacks = collection_name, None
Parser.parse_proto_IndexParam = mock.MagicMock(return_value=(OK,
unpacks))
ServiceHandler._create_index = mock.MagicMock(return_value=OK)
status = self.client.create_index(table_name=table_name)
status = self.client.create_index(collection_name=collection_name)
assert status.OK()
Parser.parse_proto_IndexParam = mock.MagicMock(return_value=(BAD,
None))
status = self.client.create_index(table_name=table_name)
status = self.client.create_index(collection_name=collection_name)
assert not status.OK()
def test_drop_table(self, started_app):
table_name = inspect.currentframe().f_code.co_name
def test_drop_collection(self, started_app):
collection_name = inspect.currentframe().f_code.co_name
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(OK, table_name))
ServiceHandler._delete_table = mock.MagicMock(return_value=OK)
status = self.client.delete_table(table_name=table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(OK, collection_name))
ServiceHandler._drop_collection = mock.MagicMock(return_value=OK)
status = self.client.drop_collection(collection_name=collection_name)
assert status.OK()
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(BAD, table_name))
status = self.client.delete_table(table_name=table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(BAD, collection_name))
status = self.client.drop_collection(collection_name=collection_name)
assert not status.OK()
def test_has_table(self, started_app):
table_name = inspect.currentframe().f_code.co_name
def test_has_collection(self, started_app):
collection_name = inspect.currentframe().f_code.co_name
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(OK, table_name))
ServiceHandler._has_table = mock.MagicMock(return_value=(OK, True))
has = self.client.has_table(table_name=table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(OK, collection_name))
ServiceHandler._has_collection = mock.MagicMock(return_value=(OK, True))
has = self.client.has_collection(collection_name=collection_name)
assert has
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(BAD, table_name))
status, has = self.client.has_table(table_name=table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(BAD, collection_name))
status, has = self.client.has_collection(collection_name=collection_name)
assert not status.OK()
assert not has
def test_create_table(self, started_app):
table_name = inspect.currentframe().f_code.co_name
def test_create_collection(self, started_app):
collection_name = inspect.currentframe().f_code.co_name
dimension = 128
table_schema = dict(table_name=table_name,
collection_schema = dict(collection_name=collection_name,
index_file_size=100,
metric_type=MetricType.L2,
dimension=dimension)
ServiceHandler._create_table = mock.MagicMock(return_value=OK)
status = self.client.create_table(table_schema)
ServiceHandler._create_collection = mock.MagicMock(return_value=OK)
status = self.client.create_collection(collection_schema)
assert status.OK()
Parser.parse_proto_TableSchema = mock.MagicMock(return_value=(BAD,
Parser.parse_proto_CollectionSchema = mock.MagicMock(return_value=(BAD,
None))
status = self.client.create_table(table_schema)
status = self.client.create_collection(collection_schema)
assert not status.OK()
def random_data(self, n, dimension):
......@@ -230,18 +230,18 @@ class TestServer:
@pytest.mark.skip
def test_search(self, started_app):
table_name = inspect.currentframe().f_code.co_name
collection_name = inspect.currentframe().f_code.co_name
to_index_cnt = random.randint(10, 20)
table = TablesFactory(table_id=table_name, state=Tables.NORMAL)
collection = TablesFactory(collection_id=collection_name, state=Tables.NORMAL)
to_index_files = TableFilesFactory.create_batch(
to_index_cnt, table=table, file_type=TableFiles.FILE_TYPE_TO_INDEX)
to_index_cnt, collection=collection, file_type=TableFiles.FILE_TYPE_TO_INDEX)
topk = random.randint(5, 10)
nq = random.randint(5, 10)
param = {
'table_name': table_name,
'query_records': self.random_data(nq, table.dimension),
'collection_name': collection_name,
'query_records': self.random_data(nq, collection.dimension),
'top_k': topk,
'nprobe': 2049
'params': {'nprobe': 2049}
}
result = [
......@@ -255,23 +255,23 @@ class TestServer:
error_code=status_pb2.SUCCESS, reason="Success"),
topk_query_result=result)
table_schema = TableSchema(table_name=table_name,
index_file_size=table.index_file_size,
metric_type=table.metric_type,
dimension=table.dimension)
collection_schema = CollectionSchema(collection_name=collection_name,
index_file_size=collection.index_file_size,
metric_type=collection.metric_type,
dimension=collection.dimension)
status, _ = self.client.search_vectors(**param)
assert status.code == Status.ILLEGAL_ARGUMENT
param['nprobe'] = 2048
param['params']['nprobe'] = 2048
RouterMixin.connection = mock.MagicMock(return_value=Milvus())
RouterMixin.query_conn.conn = mock.MagicMock(return_value=Milvus())
Milvus.describe_table = mock.MagicMock(return_value=(BAD,
table_schema))
Milvus.describe_collection = mock.MagicMock(return_value=(BAD,
collection_schema))
status, ret = self.client.search_vectors(**param)
assert status.code == Status.TABLE_NOT_EXISTS
assert status.code == Status.COLLECTION_NOT_EXISTS
Milvus.describe_table = mock.MagicMock(return_value=(OK, table_schema))
Milvus.describe_collection = mock.MagicMock(return_value=(OK, collection_schema))
Milvus.search_vectors_in_files = mock.MagicMock(
return_value=mock_results)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册