提交 0a134dab 编写于 作者: X XuPeng-SH 提交者: Jin Hai

Mishards synced to 0.6.0 release (#725)

* [skip ci](shards): update to support milvus 0.6.0

* [skip ci](shards): add show partitions

* [skip ci](shards): add drop partition

* [skip ci](shards): add create partition

* [skip ci](shards): update search for partition

* [skip ci](shards): fix search partion bugs

* [skip ci](shards): update all in one demo

* [skip ci](shards/doc): update image in readme
上级 6ade5601
......@@ -54,7 +54,7 @@ Follow below steps to start a standalone Milvus instance with Mishards from sour
3. Start Milvus server.
```shell
$ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus
$ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus:0.6.0-gpu-d120719-2b40dd
```
4. Update path permissions.
......
......@@ -48,7 +48,7 @@ Python 版本为3.6及以上。
3. 启动 Milvus 服务。
```shell
$ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus
$ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus:0.6.0-gpu-d120719-2b40dd
```
4. 更改目录权限。
......
......@@ -3,17 +3,17 @@ services:
milvus_wr:
runtime: nvidia
restart: always
image: milvusdb/milvus
image: milvusdb/milvus:0.6.0-gpu-d120719-2b40dd
volumes:
- /tmp/milvus/db:/opt/milvus/db
- /tmp/milvus/db:/var/lib/milvus/db
- ./wr_server.yml:/opt/milvus/conf/server_config.yaml
milvus_ro:
runtime: nvidia
restart: always
image: milvusdb/milvus
image: milvusdb/milvus:0.6.0-gpu-d120719-2b40dd
volumes:
- /tmp/milvus/db:/opt/milvus/db
- /tmp/milvus/db:/var/lib/milvus/db
- ./ro_server.yml:/opt/milvus/conf/server_config.yaml
jaeger:
......
......@@ -5,7 +5,7 @@ server_config:
time_zone: UTC+8
db_config:
primary_path: /opt/milvus # path used to store data and meta
primary_path: /var/lib/milvus # path used to store data and meta
secondary_path: # path used to store data only, split by semicolon
backend_url: sqlite://:@:/ # URI format: dialect://username:password@host:port/database
......
......@@ -5,7 +5,7 @@ server_config:
time_zone: UTC+8
db_config:
primary_path: /opt/milvus # path used to store data and meta
primary_path: /var/lib/milvus # path used to store data and meta
secondary_path: # path used to store data only, split by semicolon
backend_url: sqlite://:@:/ # URI format: dialect://username:password@host:port/database
......
......@@ -18,10 +18,10 @@ services:
milvus_wr:
runtime: nvidia
restart: always
image: milvusdb/milvus
image: milvusdb/milvus:0.6.0-gpu-d120719-2b40dd
volumes:
- /tmp/milvus/db:/opt/milvus/db
- ./wr_server.yml:/opt/milvus/conf/server_config.yaml
- /tmp/milvus/db:/var/lib/milvus/db
- ./wr_server.yml:/var/lib/milvus/conf/server_config.yaml
depends_on:
milvus-mysql:
condition: service_healthy
......@@ -29,10 +29,10 @@ services:
milvus_ro:
runtime: nvidia
restart: always
image: milvusdb/milvus
image: milvusdb/milvus:0.6.0-gpu-d120719-2b40dd
volumes:
- /tmp/milvus/db:/opt/milvus/db
- ./ro_server.yml:/opt/milvus/conf/server_config.yaml
- /tmp/milvus/db:/var/lib/milvus/db
- ./ro_server.yml:/var/lib/milvus/conf/server_config.yaml
depends_on:
- milvus-mysql
- milvus_wr
......
version: 0.1
server_config:
address: 0.0.0.0 # milvus server ip address (IPv4)
port: 19530 # port range: 1025 ~ 65534
......@@ -5,7 +6,7 @@ server_config:
time_zone: UTC+8
db_config:
primary_path: /opt/milvus # path used to store data and meta
primary_path: /var/lib/milvus # path used to store data and meta
secondary_path: # path used to store data only, split by semicolon
backend_url: mysql://root:milvusroot@milvus-mysql:3306/milvus
......
version: 0.1
server_config:
address: 0.0.0.0 # milvus server ip address (IPv4)
port: 19530 # port range: 1025 ~ 65534
......@@ -5,7 +6,7 @@ server_config:
time_zone: UTC+8
db_config:
primary_path: /opt/milvus # path used to store data and meta
primary_path: /var/lib/milvus # path used to store data and meta
secondary_path: # path used to store data only, split by semicolon
backend_url: mysql://root:milvusroot@milvus-mysql:3306/milvus # URI format: dialect://username:password@host:port/database
......
......@@ -2,7 +2,7 @@ import logging
import threading
from functools import wraps
from milvus import Milvus
from milvus.client.hooks import BaseaSearchHook
from milvus.client.hooks import BaseSearchHook
from mishards import (settings, exceptions)
from utils import singleton
......@@ -10,7 +10,7 @@ from utils import singleton
logger = logging.getLogger(__name__)
class Searchook(BaseaSearchHook):
class Searchook(BaseSearchHook):
def on_response(self, *args, **kwargs):
return True
......@@ -27,7 +27,7 @@ class Connection:
self.on_retry_func = kwargs.get('on_retry_func', None)
# define search hook
self.conn._set_hook(search_in_file=Searchook())
self.conn.set_hook(search_in_file=Searchook())
# self._connect()
def __str__(self):
......
......@@ -76,6 +76,14 @@ class GrpcArgsParser(object):
def parse_proto_RowRecord(cls, param):
return list(param.vector_data)
@classmethod
def parse_proto_PartitionParam(cls, param):
_table_name = param.table_name
_partition_name = param.partition_name
_tag = param.tag
return _table_name, _partition_name, _tag
@classmethod
@error_status
def parse_proto_SearchParam(cls, param):
......
......@@ -47,6 +47,9 @@ class Tables(db.Model):
id = Column(BigInteger, primary_key=True, autoincrement=True)
table_id = Column(String(50), unique=True)
owner_table = Column(String(50))
partition_tag = Column(String(50))
version = Column(String(50))
state = Column(Integer)
dimension = Column(Integer)
created_on = Column(Integer)
......@@ -72,5 +75,4 @@ class Tables(db.Model):
files = self.files.filter(cond)
logger.debug('DATE_RANGE: {}'.format(date_range))
return files
import logging
from sqlalchemy import exc as sqlalchemy_exc
from sqlalchemy import and_
from sqlalchemy import and_, or_
from mishards.models import Tables
from mishards.router import RouterMixin
from mishards import exceptions, db
......@@ -15,23 +15,35 @@ class Factory(RouterMixin):
def __init__(self, conn_mgr, **kwargs):
super(Factory, self).__init__(conn_mgr)
def routing(self, table_name, metadata=None, **kwargs):
def routing(self, table_name, partition_tags=None, metadata=None, **kwargs):
range_array = kwargs.pop('range_array', None)
return self._route(table_name, range_array, metadata, **kwargs)
return self._route(table_name, range_array, partition_tags, metadata, **kwargs)
def _route(self, table_name, range_array, metadata=None, **kwargs):
def _route(self, table_name, range_array, partition_tags=None, metadata=None, **kwargs):
# PXU TODO: Implement Thread-local Context
# PXU TODO: Session life mgt
if not partition_tags:
cond = and_(
or_(Tables.table_id == table_name, Tables.owner_table == table_name),
Tables.state != Tables.TO_DELETE)
else:
cond = and_(Tables.state != Tables.TO_DELETE,
Tables.owner_table == table_name,
Tables.partition_tag.in_(partition_tags))
try:
table = db.Session.query(Tables).filter(
and_(Tables.table_id == table_name,
Tables.state != Tables.TO_DELETE)).first()
tables = db.Session.query(Tables).filter(cond).all()
except sqlalchemy_exc.SQLAlchemyError as e:
raise exceptions.DBError(message=str(e), metadata=metadata)
if not table:
raise exceptions.TableNotFoundError(table_name, metadata=metadata)
files = table.files_to_search(range_array)
if not tables:
raise exceptions.TableNotFoundError('{}:{}'.format(table_name, partition_tags), metadata=metadata)
total_files = []
for table in tables:
files = table.files_to_search(range_array)
total_files.append(files)
db.remove_session()
servers = self.conn_mgr.conn_names
......@@ -41,12 +53,13 @@ class Factory(RouterMixin):
routing = {}
for f in files:
target_host = ring.get_node(str(f.id))
sub = routing.get(target_host, None)
if not sub:
routing[target_host] = {'table_id': table_name, 'file_ids': []}
routing[target_host]['file_ids'].append(str(f.id))
for files in total_files:
for f in files:
target_host = ring.get_node(str(f.id))
sub = routing.get(target_host, None)
if not sub:
routing[target_host] = {'table_id': f.table_id, 'file_ids': []}
routing[target_host]['file_ids'].append(str(f.id))
return routing
......
......@@ -107,6 +107,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
topk,
nprobe,
range_array=None,
partition_tags=None,
**kwargs):
metadata = kwargs.get('metadata', None)
range_array = [
......@@ -119,6 +120,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
with self.tracer.start_span('get_routing', child_of=p_span):
routing = self.router.routing(table_id,
range_array=range_array,
partition_tags=partition_tags,
metadata=metadata)
logger.info('Routing: {}'.format(routing))
......@@ -210,6 +212,40 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
error_code=_status.code, reason=_status.message),
bool_reply=_bool)
@mark_grpc_method
def CreatePartition(self, request, context):
_table_name, _partition_name, _tag = Parser.parse_proto_PartitionParam(request)
_status = self.router.connection().create_partition(_table_name, _partition_name, _tag)
return status_pb2.Status(error_code=_status.code,
reason=_status.message)
@mark_grpc_method
def DropPartition(self, request, context):
_table_name, _partition_name, _tag = Parser.parse_proto_PartitionParam(request)
_status = self.router.connection().drop_partition(_table_name, _tag)
return status_pb2.Status(error_code=_status.code,
reason=_status.message)
@mark_grpc_method
def ShowPartitions(self, request, context):
_status, _table_name = Parser.parse_proto_TableName(request)
if not _status.OK():
return milvus_pb2.PartitionList(status=status_pb2.Status(
error_code=_status.code, reason=_status.message),
partition_array=[])
logger.info('ShowPartitions {}'.format(_table_name))
_status, partition_array = self.router.connection().show_partitions(_table_name)
return milvus_pb2.PartitionList(status=status_pb2.Status(
error_code=_status.code, reason=_status.message),
partition_array=[milvus_pb2.PartitionParam(table_name=param.table_name,
tag=param.tag,
partition_name=param.partition_name)
for param in partition_array])
def _delete_table(self, table_name):
return self.router.connection().delete_table(table_name)
......@@ -315,6 +351,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
topk,
nprobe,
query_range_array,
partition_tags=getattr(request, "partition_tag_array", []),
metadata=metadata)
now = time.time()
......
......@@ -14,7 +14,8 @@ py==1.8.0
pyasn1==0.4.7
pyasn1-modules==0.2.6
pylint==2.3.1
pymilvus==0.2.5
#pymilvus==0.2.5
pymilvus-test==0.2.42
pyparsing==2.4.0
pytest==4.6.3
pytest-level==0.1.1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册