提交 bea15354 编写于 作者: N neza2017 提交者: yefu.chen

Add call of release collection when drop collection

Signed-off-by: Nneza2017 <yefu.chen@zilliz.com>
上级 1578c132
......@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"time"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
......@@ -11,10 +12,12 @@ import (
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
ps "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice"
psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
is "github.com/zilliztech/milvus-distributed/internal/indexservice"
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
qs "github.com/zilliztech/milvus-distributed/internal/queryservice"
)
type MasterService struct {
......@@ -24,6 +27,7 @@ type MasterService struct {
proxyService *psc.Client
dataService *dsc.Client
indexService *isc.Client
queryService *qsc.Client
}
func NewMasterService(ctx context.Context) (*MasterService, error) {
......@@ -103,6 +107,18 @@ func NewMasterService(ctx context.Context) (*MasterService, error) {
if err = svr.SetIndexService(indexService); err != nil {
return nil, err
}
qs.Params.Init()
log.Printf("query service address = %s:%d", qs.Params.Address, qs.Params.Port)
queryService, err := qsc.NewClient(fmt.Sprintf("%s:%d", qs.Params.Address, qs.Params.Port), time.Duration(ms.Params.Timeout)*time.Second)
if err != nil {
return nil, err
}
if err = svr.SetQueryService(queryService); err != nil {
return nil, err
}
return &MasterService{
ctx: ctx,
svr: svr,
......@@ -110,6 +126,7 @@ func NewMasterService(ctx context.Context) (*MasterService, error) {
proxyService: proxyService,
dataService: dataService,
indexService: indexService,
queryService: queryService,
}, nil
}
......@@ -135,6 +152,9 @@ func (m *MasterService) Stop() error {
if m.dataService != nil {
_ = m.dataService.Stop()
}
if m.queryService != nil {
_ = m.queryService.Stop()
}
if m.svr != nil {
return m.svr.Stop()
}
......
......@@ -111,6 +111,10 @@ func TestGrpcService(t *testing.T) {
return nil
}
core.ReleaseCollection = func(ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error {
return nil
}
err = svr.Start()
assert.Nil(t, err)
......
......@@ -97,6 +97,14 @@ func (s *GrpcServer) SetIndexService(p cms.IndexServiceInterface) error {
return c.SetIndexService(p)
}
func (s *GrpcServer) SetQueryService(q cms.QueryServiceInterface) error {
c, ok := s.core.(*cms.Core)
if !ok {
return errors.Errorf("set query service failed")
}
return c.SetQueryService(q)
}
func (s *GrpcServer) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return s.core.GetComponentStates()
}
......
......@@ -20,6 +20,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
......@@ -47,6 +48,10 @@ type IndexServiceInterface interface {
BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error)
}
type QueryServiceInterface interface {
ReleaseCollection(req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error)
}
type Interface interface {
//service
Init() error
......@@ -147,15 +152,18 @@ type Core struct {
//setMsgStreams ,if segment flush completed, data node would put segment id into msg stream
DataNodeSegmentFlushCompletedChan chan typeutil.UniqueID
//TODO,get binlog file path from data service,
//get binlog file path from data service,
GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
//TODO, call index builder's client to build index, return build id
//call index builder's client to build index, return build id
BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error)
//TODO, proxy service interface, notify proxy service to drop collection
//proxy service interface, notify proxy service to drop collection
InvalidateCollectionMetaCache func(ts typeutil.Timestamp, dbName string, collectionName string) error
//query service interface, notify query service to release collection
ReleaseCollection func(ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error
// put create index task into this chan
indexTaskQueue chan *CreateIndexTask
......@@ -245,6 +253,10 @@ func (c *Core) checkInit() error {
if c.DataNodeSegmentFlushCompletedChan == nil {
return errors.Errorf("DataNodeSegmentFlushCompletedChan is nil")
}
if c.ReleaseCollection == nil {
return errors.Errorf("ReleaseCollection is nil")
}
log.Printf("master node id = %d", Params.NodeID)
log.Printf("master dd channel name = %s", Params.DdChannel)
log.Printf("master time ticke channel name = %s", Params.TimeTickChannel)
......@@ -691,6 +703,30 @@ func (c *Core) SetIndexService(s IndexServiceInterface) error {
return nil
}
func (c *Core) SetQueryService(s QueryServiceInterface) error {
c.ReleaseCollection = func(ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error {
req := &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kReleaseCollection,
MsgID: 0, //TODO, msg ID
Timestamp: ts,
SourceID: int64(Params.NodeID),
},
DbID: dbID,
CollectionID: collectionID,
}
rsp, err := s.ReleaseCollection(req)
if err != nil {
return err
}
if rsp.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.Errorf("ReleaseCollection from query service failed, error = %s", rsp.Reason)
}
return nil
}
return nil
}
func (c *Core) Init() error {
var initError error = nil
c.initOnce.Do(func() {
......
......@@ -18,6 +18,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
......@@ -79,6 +80,21 @@ func (d *dataMock) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*d
return rst, nil
}
type queryMock struct {
collID []typeutil.UniqueID
mutex sync.Mutex
}
func (q *queryMock) ReleaseCollection(req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
q.collID = append(q.collID, req.CollectionID)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
}, nil
}
func (d *dataMock) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
......@@ -163,6 +179,13 @@ func TestMasterService(t *testing.T) {
err = core.SetIndexService(im)
assert.Nil(t, err)
qm := &queryMock{
collID: nil,
mutex: sync.Mutex{},
}
err = core.SetQueryService(qm)
assert.Nil(t, err)
err = core.Init()
assert.Nil(t, err)
......@@ -768,6 +791,12 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, len(collArray), 1)
assert.Equal(t, collArray[0], "testColl")
time.Sleep(time.Millisecond * 100)
qm.mutex.Lock()
assert.Equal(t, len(qm.collID), 1)
assert.Equal(t, qm.collID[0], collMeta.ID)
qm.mutex.Unlock()
req = &milvuspb.DropCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDropCollection,
......
package masterservice
import (
"log"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
......@@ -195,6 +197,7 @@ func (t *DropCollectionReqTask) Execute() error {
if err = t.core.InvalidateCollectionMetaCache(t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName); err != nil {
return err
}
err = t.core.MetaTable.DeleteCollection(collMeta.ID)
if err != nil {
return err
......@@ -214,6 +217,14 @@ func (t *DropCollectionReqTask) Execute() error {
if err != nil {
return err
}
//notify query service to release collection
go func() {
if err = t.core.ReleaseCollection(t.Req.Base.Timestamp, 0, collMeta.ID); err != nil {
log.Printf("%s", err.Error())
}
}()
return nil
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册