From 0a134dab82085ec6f775ff994ccd0a587de5a979 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Mon, 9 Dec 2019 10:55:32 +0800 Subject: [PATCH] Mishards synced to 0.6.0 release (#725) * [skip ci](shards): update to support milvus 0.6.0 * [skip ci](shards): add show partitions * [skip ci](shards): add drop partition * [skip ci](shards): add create partition * [skip ci](shards): update search for partition * [skip ci](shards): fix search partion bugs * [skip ci](shards): update all in one demo * [skip ci](shards/doc): update image in readme --- shards/README.md | 2 +- shards/README_CN.md | 2 +- shards/all_in_one/all_in_one.yml | 8 ++-- shards/all_in_one/ro_server.yml | 2 +- shards/all_in_one/wr_server.yml | 2 +- shards/all_in_one_with_mysql/all_in_one.yml | 12 ++--- shards/all_in_one_with_mysql/ro_server.yml | 3 +- shards/all_in_one_with_mysql/wr_server.yml | 3 +- shards/mishards/connections.py | 6 +-- .../mishards/grpc_utils/grpc_args_parser.py | 8 ++++ shards/mishards/models.py | 4 +- .../plugins/file_based_hash_ring_router.py | 45 ++++++++++++------- shards/mishards/service_handler.py | 37 +++++++++++++++ shards/requirements.txt | 3 +- 14 files changed, 100 insertions(+), 37 deletions(-) diff --git a/shards/README.md b/shards/README.md index 1e0b000a..719afc28 100644 --- a/shards/README.md +++ b/shards/README.md @@ -54,7 +54,7 @@ Follow below steps to start a standalone Milvus instance with Mishards from sour 3. Start Milvus server. ```shell - $ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus + $ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus:0.6.0-gpu-d120719-2b40dd ``` 4. Update path permissions. diff --git a/shards/README_CN.md b/shards/README_CN.md index 98264b20..ab87fb5b 100644 --- a/shards/README_CN.md +++ b/shards/README_CN.md @@ -48,7 +48,7 @@ Python 版本为3.6及以上。 3. 启动 Milvus 服务。 ```shell - $ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus + $ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus:0.6.0-gpu-d120719-2b40dd ``` 4. 更改目录权限。 diff --git a/shards/all_in_one/all_in_one.yml b/shards/all_in_one/all_in_one.yml index 75a33400..dd550553 100644 --- a/shards/all_in_one/all_in_one.yml +++ b/shards/all_in_one/all_in_one.yml @@ -3,17 +3,17 @@ services: milvus_wr: runtime: nvidia restart: always - image: milvusdb/milvus + image: milvusdb/milvus:0.6.0-gpu-d120719-2b40dd volumes: - - /tmp/milvus/db:/opt/milvus/db + - /tmp/milvus/db:/var/lib/milvus/db - ./wr_server.yml:/opt/milvus/conf/server_config.yaml milvus_ro: runtime: nvidia restart: always - image: milvusdb/milvus + image: milvusdb/milvus:0.6.0-gpu-d120719-2b40dd volumes: - - /tmp/milvus/db:/opt/milvus/db + - /tmp/milvus/db:/var/lib/milvus/db - ./ro_server.yml:/opt/milvus/conf/server_config.yaml jaeger: diff --git a/shards/all_in_one/ro_server.yml b/shards/all_in_one/ro_server.yml index 09857ee9..00a8f611 100644 --- a/shards/all_in_one/ro_server.yml +++ b/shards/all_in_one/ro_server.yml @@ -5,7 +5,7 @@ server_config: time_zone: UTC+8 db_config: - primary_path: /opt/milvus # path used to store data and meta + primary_path: /var/lib/milvus # path used to store data and meta secondary_path: # path used to store data only, split by semicolon backend_url: sqlite://:@:/ # URI format: dialect://username:password@host:port/database diff --git a/shards/all_in_one/wr_server.yml b/shards/all_in_one/wr_server.yml index 5d7d855c..cf0d06e2 100644 --- a/shards/all_in_one/wr_server.yml +++ b/shards/all_in_one/wr_server.yml @@ -5,7 +5,7 @@ server_config: time_zone: UTC+8 db_config: - primary_path: /opt/milvus # path used to store data and meta + primary_path: /var/lib/milvus # path used to store data and meta secondary_path: # path used to store data only, split by semicolon backend_url: sqlite://:@:/ # URI format: dialect://username:password@host:port/database diff --git a/shards/all_in_one_with_mysql/all_in_one.yml b/shards/all_in_one_with_mysql/all_in_one.yml index 6619635f..5343f18e 100644 --- a/shards/all_in_one_with_mysql/all_in_one.yml +++ b/shards/all_in_one_with_mysql/all_in_one.yml @@ -18,10 +18,10 @@ services: milvus_wr: runtime: nvidia restart: always - image: milvusdb/milvus + image: milvusdb/milvus:0.6.0-gpu-d120719-2b40dd volumes: - - /tmp/milvus/db:/opt/milvus/db - - ./wr_server.yml:/opt/milvus/conf/server_config.yaml + - /tmp/milvus/db:/var/lib/milvus/db + - ./wr_server.yml:/var/lib/milvus/conf/server_config.yaml depends_on: milvus-mysql: condition: service_healthy @@ -29,10 +29,10 @@ services: milvus_ro: runtime: nvidia restart: always - image: milvusdb/milvus + image: milvusdb/milvus:0.6.0-gpu-d120719-2b40dd volumes: - - /tmp/milvus/db:/opt/milvus/db - - ./ro_server.yml:/opt/milvus/conf/server_config.yaml + - /tmp/milvus/db:/var/lib/milvus/db + - ./ro_server.yml:/var/lib/milvus/conf/server_config.yaml depends_on: - milvus-mysql - milvus_wr diff --git a/shards/all_in_one_with_mysql/ro_server.yml b/shards/all_in_one_with_mysql/ro_server.yml index 768fafb1..4355fb5a 100644 --- a/shards/all_in_one_with_mysql/ro_server.yml +++ b/shards/all_in_one_with_mysql/ro_server.yml @@ -1,3 +1,4 @@ +version: 0.1 server_config: address: 0.0.0.0 # milvus server ip address (IPv4) port: 19530 # port range: 1025 ~ 65534 @@ -5,7 +6,7 @@ server_config: time_zone: UTC+8 db_config: - primary_path: /opt/milvus # path used to store data and meta + primary_path: /var/lib/milvus # path used to store data and meta secondary_path: # path used to store data only, split by semicolon backend_url: mysql://root:milvusroot@milvus-mysql:3306/milvus diff --git a/shards/all_in_one_with_mysql/wr_server.yml b/shards/all_in_one_with_mysql/wr_server.yml index b2332532..9cbb8f9c 100644 --- a/shards/all_in_one_with_mysql/wr_server.yml +++ b/shards/all_in_one_with_mysql/wr_server.yml @@ -1,3 +1,4 @@ +version: 0.1 server_config: address: 0.0.0.0 # milvus server ip address (IPv4) port: 19530 # port range: 1025 ~ 65534 @@ -5,7 +6,7 @@ server_config: time_zone: UTC+8 db_config: - primary_path: /opt/milvus # path used to store data and meta + primary_path: /var/lib/milvus # path used to store data and meta secondary_path: # path used to store data only, split by semicolon backend_url: mysql://root:milvusroot@milvus-mysql:3306/milvus # URI format: dialect://username:password@host:port/database diff --git a/shards/mishards/connections.py b/shards/mishards/connections.py index 50e214ec..459f5484 100644 --- a/shards/mishards/connections.py +++ b/shards/mishards/connections.py @@ -2,7 +2,7 @@ import logging import threading from functools import wraps from milvus import Milvus -from milvus.client.hooks import BaseaSearchHook +from milvus.client.hooks import BaseSearchHook from mishards import (settings, exceptions) from utils import singleton @@ -10,7 +10,7 @@ from utils import singleton logger = logging.getLogger(__name__) -class Searchook(BaseaSearchHook): +class Searchook(BaseSearchHook): def on_response(self, *args, **kwargs): return True @@ -27,7 +27,7 @@ class Connection: self.on_retry_func = kwargs.get('on_retry_func', None) # define search hook - self.conn._set_hook(search_in_file=Searchook()) + self.conn.set_hook(search_in_file=Searchook()) # self._connect() def __str__(self): diff --git a/shards/mishards/grpc_utils/grpc_args_parser.py b/shards/mishards/grpc_utils/grpc_args_parser.py index 03929980..59dec55a 100644 --- a/shards/mishards/grpc_utils/grpc_args_parser.py +++ b/shards/mishards/grpc_utils/grpc_args_parser.py @@ -76,6 +76,14 @@ class GrpcArgsParser(object): def parse_proto_RowRecord(cls, param): return list(param.vector_data) + @classmethod + def parse_proto_PartitionParam(cls, param): + _table_name = param.table_name + _partition_name = param.partition_name + _tag = param.tag + + return _table_name, _partition_name, _tag + @classmethod @error_status def parse_proto_SearchParam(cls, param): diff --git a/shards/mishards/models.py b/shards/mishards/models.py index 4b6c8f9e..f8c6b55b 100644 --- a/shards/mishards/models.py +++ b/shards/mishards/models.py @@ -47,6 +47,9 @@ class Tables(db.Model): id = Column(BigInteger, primary_key=True, autoincrement=True) table_id = Column(String(50), unique=True) + owner_table = Column(String(50)) + partition_tag = Column(String(50)) + version = Column(String(50)) state = Column(Integer) dimension = Column(Integer) created_on = Column(Integer) @@ -72,5 +75,4 @@ class Tables(db.Model): files = self.files.filter(cond) - logger.debug('DATE_RANGE: {}'.format(date_range)) return files diff --git a/shards/mishards/router/plugins/file_based_hash_ring_router.py b/shards/mishards/router/plugins/file_based_hash_ring_router.py index b9093512..c7c221de 100644 --- a/shards/mishards/router/plugins/file_based_hash_ring_router.py +++ b/shards/mishards/router/plugins/file_based_hash_ring_router.py @@ -1,6 +1,6 @@ import logging from sqlalchemy import exc as sqlalchemy_exc -from sqlalchemy import and_ +from sqlalchemy import and_, or_ from mishards.models import Tables from mishards.router import RouterMixin from mishards import exceptions, db @@ -15,23 +15,35 @@ class Factory(RouterMixin): def __init__(self, conn_mgr, **kwargs): super(Factory, self).__init__(conn_mgr) - def routing(self, table_name, metadata=None, **kwargs): + def routing(self, table_name, partition_tags=None, metadata=None, **kwargs): range_array = kwargs.pop('range_array', None) - return self._route(table_name, range_array, metadata, **kwargs) + return self._route(table_name, range_array, partition_tags, metadata, **kwargs) - def _route(self, table_name, range_array, metadata=None, **kwargs): + def _route(self, table_name, range_array, partition_tags=None, metadata=None, **kwargs): # PXU TODO: Implement Thread-local Context # PXU TODO: Session life mgt + + if not partition_tags: + cond = and_( + or_(Tables.table_id == table_name, Tables.owner_table == table_name), + Tables.state != Tables.TO_DELETE) + else: + cond = and_(Tables.state != Tables.TO_DELETE, + Tables.owner_table == table_name, + Tables.partition_tag.in_(partition_tags)) try: - table = db.Session.query(Tables).filter( - and_(Tables.table_id == table_name, - Tables.state != Tables.TO_DELETE)).first() + tables = db.Session.query(Tables).filter(cond).all() except sqlalchemy_exc.SQLAlchemyError as e: raise exceptions.DBError(message=str(e), metadata=metadata) - if not table: - raise exceptions.TableNotFoundError(table_name, metadata=metadata) - files = table.files_to_search(range_array) + if not tables: + raise exceptions.TableNotFoundError('{}:{}'.format(table_name, partition_tags), metadata=metadata) + + total_files = [] + for table in tables: + files = table.files_to_search(range_array) + total_files.append(files) + db.remove_session() servers = self.conn_mgr.conn_names @@ -41,12 +53,13 @@ class Factory(RouterMixin): routing = {} - for f in files: - target_host = ring.get_node(str(f.id)) - sub = routing.get(target_host, None) - if not sub: - routing[target_host] = {'table_id': table_name, 'file_ids': []} - routing[target_host]['file_ids'].append(str(f.id)) + for files in total_files: + for f in files: + target_host = ring.get_node(str(f.id)) + sub = routing.get(target_host, None) + if not sub: + routing[target_host] = {'table_id': f.table_id, 'file_ids': []} + routing[target_host]['file_ids'].append(str(f.id)) return routing diff --git a/shards/mishards/service_handler.py b/shards/mishards/service_handler.py index 1b56c864..66942269 100644 --- a/shards/mishards/service_handler.py +++ b/shards/mishards/service_handler.py @@ -107,6 +107,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): topk, nprobe, range_array=None, + partition_tags=None, **kwargs): metadata = kwargs.get('metadata', None) range_array = [ @@ -119,6 +120,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): with self.tracer.start_span('get_routing', child_of=p_span): routing = self.router.routing(table_id, range_array=range_array, + partition_tags=partition_tags, metadata=metadata) logger.info('Routing: {}'.format(routing)) @@ -210,6 +212,40 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): error_code=_status.code, reason=_status.message), bool_reply=_bool) + @mark_grpc_method + def CreatePartition(self, request, context): + _table_name, _partition_name, _tag = Parser.parse_proto_PartitionParam(request) + _status = self.router.connection().create_partition(_table_name, _partition_name, _tag) + return status_pb2.Status(error_code=_status.code, + reason=_status.message) + + @mark_grpc_method + def DropPartition(self, request, context): + _table_name, _partition_name, _tag = Parser.parse_proto_PartitionParam(request) + + _status = self.router.connection().drop_partition(_table_name, _tag) + return status_pb2.Status(error_code=_status.code, + reason=_status.message) + + @mark_grpc_method + def ShowPartitions(self, request, context): + _status, _table_name = Parser.parse_proto_TableName(request) + if not _status.OK(): + return milvus_pb2.PartitionList(status=status_pb2.Status( + error_code=_status.code, reason=_status.message), + partition_array=[]) + + logger.info('ShowPartitions {}'.format(_table_name)) + + _status, partition_array = self.router.connection().show_partitions(_table_name) + + return milvus_pb2.PartitionList(status=status_pb2.Status( + error_code=_status.code, reason=_status.message), + partition_array=[milvus_pb2.PartitionParam(table_name=param.table_name, + tag=param.tag, + partition_name=param.partition_name) + for param in partition_array]) + def _delete_table(self, table_name): return self.router.connection().delete_table(table_name) @@ -315,6 +351,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): topk, nprobe, query_range_array, + partition_tags=getattr(request, "partition_tag_array", []), metadata=metadata) now = time.time() diff --git a/shards/requirements.txt b/shards/requirements.txt index 426ee0b7..47e1e521 100644 --- a/shards/requirements.txt +++ b/shards/requirements.txt @@ -14,7 +14,8 @@ py==1.8.0 pyasn1==0.4.7 pyasn1-modules==0.2.6 pylint==2.3.1 -pymilvus==0.2.5 +#pymilvus==0.2.5 +pymilvus-test==0.2.42 pyparsing==2.4.0 pytest==4.6.3 pytest-level==0.1.1 -- GitLab