提交 e6aec3fd 编写于 作者: D dragondriver 提交者: yefu.chen

Hardcode the peer id of proxy node

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 6ef82a59
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"time"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice" ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
...@@ -11,10 +12,12 @@ import ( ...@@ -11,10 +12,12 @@ import (
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
ps "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice" ps "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice"
psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client" psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
is "github.com/zilliztech/milvus-distributed/internal/indexservice" is "github.com/zilliztech/milvus-distributed/internal/indexservice"
ms "github.com/zilliztech/milvus-distributed/internal/masterservice" ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
qs "github.com/zilliztech/milvus-distributed/internal/queryservice"
) )
type MasterService struct { type MasterService struct {
...@@ -24,6 +27,7 @@ type MasterService struct { ...@@ -24,6 +27,7 @@ type MasterService struct {
proxyService *psc.Client proxyService *psc.Client
dataService *dsc.Client dataService *dsc.Client
indexService *isc.Client indexService *isc.Client
queryService *qsc.Client
} }
func NewMasterService(ctx context.Context) (*MasterService, error) { func NewMasterService(ctx context.Context) (*MasterService, error) {
...@@ -103,6 +107,18 @@ func NewMasterService(ctx context.Context) (*MasterService, error) { ...@@ -103,6 +107,18 @@ func NewMasterService(ctx context.Context) (*MasterService, error) {
if err = svr.SetIndexService(indexService); err != nil { if err = svr.SetIndexService(indexService); err != nil {
return nil, err return nil, err
} }
qs.Params.Init()
log.Printf("query service address = %s:%d", qs.Params.Address, qs.Params.Port)
queryService, err := qsc.NewClient(fmt.Sprintf("%s:%d", qs.Params.Address, qs.Params.Port), time.Duration(ms.Params.Timeout)*time.Second)
if err != nil {
return nil, err
}
if err = svr.SetQueryService(queryService); err != nil {
return nil, err
}
return &MasterService{ return &MasterService{
ctx: ctx, ctx: ctx,
svr: svr, svr: svr,
...@@ -110,6 +126,7 @@ func NewMasterService(ctx context.Context) (*MasterService, error) { ...@@ -110,6 +126,7 @@ func NewMasterService(ctx context.Context) (*MasterService, error) {
proxyService: proxyService, proxyService: proxyService,
dataService: dataService, dataService: dataService,
indexService: indexService, indexService: indexService,
queryService: queryService,
}, nil }, nil
} }
...@@ -135,6 +152,9 @@ func (m *MasterService) Stop() error { ...@@ -135,6 +152,9 @@ func (m *MasterService) Stop() error {
if m.dataService != nil { if m.dataService != nil {
_ = m.dataService.Stop() _ = m.dataService.Stop()
} }
if m.queryService != nil {
_ = m.queryService.Stop()
}
if m.svr != nil { if m.svr != nil {
return m.svr.Stop() return m.svr.Stop()
} }
......
...@@ -111,6 +111,10 @@ func TestGrpcService(t *testing.T) { ...@@ -111,6 +111,10 @@ func TestGrpcService(t *testing.T) {
return nil return nil
} }
core.ReleaseCollection = func(ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error {
return nil
}
err = svr.Start() err = svr.Start()
assert.Nil(t, err) assert.Nil(t, err)
......
...@@ -97,6 +97,14 @@ func (s *GrpcServer) SetIndexService(p cms.IndexServiceInterface) error { ...@@ -97,6 +97,14 @@ func (s *GrpcServer) SetIndexService(p cms.IndexServiceInterface) error {
return c.SetIndexService(p) return c.SetIndexService(p)
} }
func (s *GrpcServer) SetQueryService(q cms.QueryServiceInterface) error {
c, ok := s.core.(*cms.Core)
if !ok {
return errors.Errorf("set query service failed")
}
return c.SetQueryService(q)
}
func (s *GrpcServer) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { func (s *GrpcServer) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return s.core.GetComponentStates() return s.core.GetComponentStates()
} }
......
...@@ -20,6 +20,7 @@ import ( ...@@ -20,6 +20,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb" "github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3"
...@@ -47,6 +48,10 @@ type IndexServiceInterface interface { ...@@ -47,6 +48,10 @@ type IndexServiceInterface interface {
BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error)
} }
type QueryServiceInterface interface {
ReleaseCollection(req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error)
}
type Interface interface { type Interface interface {
//service //service
Init() error Init() error
...@@ -147,15 +152,18 @@ type Core struct { ...@@ -147,15 +152,18 @@ type Core struct {
//setMsgStreams ,if segment flush completed, data node would put segment id into msg stream //setMsgStreams ,if segment flush completed, data node would put segment id into msg stream
DataNodeSegmentFlushCompletedChan chan typeutil.UniqueID DataNodeSegmentFlushCompletedChan chan typeutil.UniqueID
//TODO,get binlog file path from data service, //get binlog file path from data service,
GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
//TODO, call index builder's client to build index, return build id //call index builder's client to build index, return build id
BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error)
//TODO, proxy service interface, notify proxy service to drop collection //proxy service interface, notify proxy service to drop collection
InvalidateCollectionMetaCache func(ts typeutil.Timestamp, dbName string, collectionName string) error InvalidateCollectionMetaCache func(ts typeutil.Timestamp, dbName string, collectionName string) error
//query service interface, notify query service to release collection
ReleaseCollection func(ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error
// put create index task into this chan // put create index task into this chan
indexTaskQueue chan *CreateIndexTask indexTaskQueue chan *CreateIndexTask
...@@ -245,6 +253,10 @@ func (c *Core) checkInit() error { ...@@ -245,6 +253,10 @@ func (c *Core) checkInit() error {
if c.DataNodeSegmentFlushCompletedChan == nil { if c.DataNodeSegmentFlushCompletedChan == nil {
return errors.Errorf("DataNodeSegmentFlushCompletedChan is nil") return errors.Errorf("DataNodeSegmentFlushCompletedChan is nil")
} }
if c.ReleaseCollection == nil {
return errors.Errorf("ReleaseCollection is nil")
}
log.Printf("master node id = %d", Params.NodeID) log.Printf("master node id = %d", Params.NodeID)
log.Printf("master dd channel name = %s", Params.DdChannel) log.Printf("master dd channel name = %s", Params.DdChannel)
log.Printf("master time ticke channel name = %s", Params.TimeTickChannel) log.Printf("master time ticke channel name = %s", Params.TimeTickChannel)
...@@ -691,6 +703,30 @@ func (c *Core) SetIndexService(s IndexServiceInterface) error { ...@@ -691,6 +703,30 @@ func (c *Core) SetIndexService(s IndexServiceInterface) error {
return nil return nil
} }
func (c *Core) SetQueryService(s QueryServiceInterface) error {
c.ReleaseCollection = func(ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error {
req := &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kReleaseCollection,
MsgID: 0, //TODO, msg ID
Timestamp: ts,
SourceID: int64(Params.NodeID),
},
DbID: dbID,
CollectionID: collectionID,
}
rsp, err := s.ReleaseCollection(req)
if err != nil {
return err
}
if rsp.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.Errorf("ReleaseCollection from query service failed, error = %s", rsp.Reason)
}
return nil
}
return nil
}
func (c *Core) Init() error { func (c *Core) Init() error {
var initError error = nil var initError error = nil
c.initOnce.Do(func() { c.initOnce.Do(func() {
......
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb" "github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
) )
...@@ -79,6 +80,21 @@ func (d *dataMock) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*d ...@@ -79,6 +80,21 @@ func (d *dataMock) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*d
return rst, nil return rst, nil
} }
type queryMock struct {
collID []typeutil.UniqueID
mutex sync.Mutex
}
func (q *queryMock) ReleaseCollection(req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
q.collID = append(q.collID, req.CollectionID)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
}, nil
}
func (d *dataMock) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) { func (d *dataMock) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{ return &milvuspb.StringResponse{
Status: &commonpb.Status{ Status: &commonpb.Status{
...@@ -163,6 +179,13 @@ func TestMasterService(t *testing.T) { ...@@ -163,6 +179,13 @@ func TestMasterService(t *testing.T) {
err = core.SetIndexService(im) err = core.SetIndexService(im)
assert.Nil(t, err) assert.Nil(t, err)
qm := &queryMock{
collID: nil,
mutex: sync.Mutex{},
}
err = core.SetQueryService(qm)
assert.Nil(t, err)
err = core.Init() err = core.Init()
assert.Nil(t, err) assert.Nil(t, err)
...@@ -768,6 +791,12 @@ func TestMasterService(t *testing.T) { ...@@ -768,6 +791,12 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, len(collArray), 1) assert.Equal(t, len(collArray), 1)
assert.Equal(t, collArray[0], "testColl") assert.Equal(t, collArray[0], "testColl")
time.Sleep(time.Millisecond * 100)
qm.mutex.Lock()
assert.Equal(t, len(qm.collID), 1)
assert.Equal(t, qm.collID[0], collMeta.ID)
qm.mutex.Unlock()
req = &milvuspb.DropCollectionRequest{ req = &milvuspb.DropCollectionRequest{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDropCollection, MsgType: commonpb.MsgType_kDropCollection,
......
package masterservice package masterservice
import ( import (
"log"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
...@@ -195,6 +197,7 @@ func (t *DropCollectionReqTask) Execute() error { ...@@ -195,6 +197,7 @@ func (t *DropCollectionReqTask) Execute() error {
if err = t.core.InvalidateCollectionMetaCache(t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName); err != nil { if err = t.core.InvalidateCollectionMetaCache(t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName); err != nil {
return err return err
} }
err = t.core.MetaTable.DeleteCollection(collMeta.ID) err = t.core.MetaTable.DeleteCollection(collMeta.ID)
if err != nil { if err != nil {
return err return err
...@@ -214,6 +217,14 @@ func (t *DropCollectionReqTask) Execute() error { ...@@ -214,6 +217,14 @@ func (t *DropCollectionReqTask) Execute() error {
if err != nil { if err != nil {
return err return err
} }
//notify query service to release collection
go func() {
if err = t.core.ReleaseCollection(t.Req.Base.Timestamp, 0, collMeta.ID); err != nil {
log.Printf("%s", err.Error())
}
}()
return nil return nil
} }
......
...@@ -121,7 +121,7 @@ func (s *ServiceImpl) Init() error { ...@@ -121,7 +121,7 @@ func (s *ServiceImpl) Init() error {
"proxyservicesub") // TODO: add config "proxyservicesub") // TODO: add config
log.Println("create node time tick consumer channel: ", Params.NodeTimeTickChannel) log.Println("create node time tick consumer channel: ", Params.NodeTimeTickChannel)
ttBarrier := newSoftTimeTickBarrier(s.ctx, nodeTimeTickMsgStream, []UniqueID{0}, 10) ttBarrier := newSoftTimeTickBarrier(s.ctx, nodeTimeTickMsgStream, []UniqueID{1}, 10)
log.Println("create soft time tick barrier ...") log.Println("create soft time tick barrier ...")
s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream, insertTickMsgStream) s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream, insertTickMsgStream)
log.Println("create time tick ...") log.Println("create time tick ...")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册