提交 9f72633f 编写于 作者: S sunby 提交者: yefu.chen

Add dd handler

Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
上级 0e6985b4
......@@ -89,6 +89,7 @@ func main() {
panic(err)
}
is.Params.Init()
log.Printf("index service address : %s", is.Params.Address)
indexService := isc.NewClient(is.Params.Address)
......@@ -96,6 +97,10 @@ func main() {
panic(err)
}
if err = svr.Init(); err != nil {
panic(err)
}
if err = svr.Start(); err != nil {
panic(err)
}
......
package dataservice
import (
"fmt"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
type ddHandler struct {
meta *meta
segmentAllocator segmentAllocator
}
func newDDHandler(meta *meta, allocator segmentAllocator) *ddHandler {
return &ddHandler{
meta: meta,
segmentAllocator: allocator,
}
}
func (handler *ddHandler) HandleDDMsg(msg msgstream.TsMsg) error {
switch msg.Type() {
case commonpb.MsgType_kCreateCollection:
realMsg := msg.(*msgstream.CreateCollectionMsg)
return handler.handleCreateCollection(realMsg)
case commonpb.MsgType_kDropCollection:
realMsg := msg.(*msgstream.DropCollectionMsg)
return handler.handleDropCollection(realMsg)
case commonpb.MsgType_kCreatePartition:
realMsg := msg.(*msgstream.CreatePartitionMsg)
return handler.handleCreatePartition(realMsg)
case commonpb.MsgType_kDropPartition:
realMsg := msg.(*msgstream.DropPartitionMsg)
return handler.handleDropPartition(realMsg)
default:
return fmt.Errorf("unknown msg type: %v", msg.Type())
}
}
func (handler *ddHandler) handleCreateCollection(msg *msgstream.CreateCollectionMsg) error {
schema := &schemapb.CollectionSchema{}
if err := proto.Unmarshal(msg.Schema, schema); err != nil {
return err
}
err := handler.meta.AddCollection(&collectionInfo{
ID: msg.CollectionID,
Schema: schema,
})
if err != nil {
return err
}
return nil
}
func (handler *ddHandler) handleDropCollection(msg *msgstream.DropCollectionMsg) error {
ids := handler.meta.GetSegmentsByCollectionID(msg.CollectionID)
for _, id := range ids {
if err := handler.meta.DropSegment(id); err != nil {
continue
}
handler.segmentAllocator.DropSegment(id)
}
if err := handler.meta.DropCollection(msg.CollectionID); err != nil {
return err
}
return nil
}
func (handler *ddHandler) handleDropPartition(msg *msgstream.DropPartitionMsg) error {
ids := handler.meta.GetSegmentsByCollectionAndPartitionID(msg.CollectionID, msg.PartitionID)
for _, id := range ids {
if err := handler.meta.DropSegment(id); err != nil {
return err
}
handler.segmentAllocator.DropSegment(id)
}
if err := handler.meta.DropPartition(msg.CollectionID, msg.PartitionID); err != nil {
return err
}
return nil
}
func (handler *ddHandler) handleCreatePartition(msg *msgstream.CreatePartitionMsg) error {
return handler.meta.AddPartition(msg.CollectionID, msg.PartitionID)
}
......@@ -65,26 +65,26 @@ type (
UniqueID = typeutil.UniqueID
Timestamp = typeutil.Timestamp
Server struct {
ctx context.Context
serverLoopCtx context.Context
serverLoopCancel context.CancelFunc
serverLoopWg sync.WaitGroup
state atomic.Value
client *etcdkv.EtcdKV
meta *meta
segAllocator segmentAllocator
statsHandler *statsHandler
insertChannelMgr *insertChannelManager
allocator allocator
cluster *dataNodeCluster
msgProducer *timesync.MsgProducer
registerFinishCh chan struct{}
masterClient MasterClient
ttMsgStream msgstream.MsgStream
k2sMsgStream msgstream.MsgStream
ddChannelName string
segmentInfoStream msgstream.MsgStream
segmentFlushStream msgstream.MsgStream
ctx context.Context
serverLoopCtx context.Context
serverLoopCancel context.CancelFunc
serverLoopWg sync.WaitGroup
state atomic.Value
client *etcdkv.EtcdKV
meta *meta
segAllocator segmentAllocator
statsHandler *statsHandler
ddHandler *ddHandler
insertChannelMgr *insertChannelManager
allocator allocator
cluster *dataNodeCluster
msgProducer *timesync.MsgProducer
registerFinishCh chan struct{}
masterClient MasterClient
ttMsgStream msgstream.MsgStream
k2sMsgStream msgstream.MsgStream
ddChannelName string
segmentInfoStream msgstream.MsgStream
}
)
......@@ -97,7 +97,6 @@ func CreateServer(ctx context.Context) (*Server, error) {
registerFinishCh: ch,
cluster: newDataNodeCluster(ch),
}
s.state.Store(internalpb2.StateCode_INITIALIZING)
return s, nil
}
......@@ -106,6 +105,7 @@ func (s *Server) SetMasterClient(masterClient MasterClient) {
}
func (s *Server) Init() error {
s.state.Store(internalpb2.StateCode_INITIALIZING)
return nil
}
......@@ -120,6 +120,7 @@ func (s *Server) Start() error {
if err != nil {
return err
}
s.ddHandler = newDDHandler(s.meta, s.segAllocator)
s.initSegmentInfoChannel()
if err = s.initMsgProducer(); err != nil {
return err
......@@ -187,6 +188,13 @@ func (s *Server) loadMetaFromMaster() error {
if err := s.checkMasterIsHealthy(); err != nil {
return err
}
if s.ddChannelName == "" {
channel, err := s.masterClient.GetDdChannel()
if err != nil {
return err
}
s.ddChannelName = channel
}
collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowCollections,
......@@ -274,9 +282,10 @@ func (s *Server) checkMasterIsHealthy() error {
func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(2)
s.serverLoopWg.Add(3)
go s.startStatsChannel(s.serverLoopCtx)
go s.startSegmentFlushChannel(s.serverLoopCtx)
go s.startDDChannel(s.serverLoopCtx)
}
func (s *Server) startStatsChannel(ctx context.Context) {
......@@ -340,6 +349,30 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) {
}
}
func (s *Server) startDDChannel(ctx context.Context) {
defer s.serverLoopWg.Done()
ddStream := pulsarms.NewPulsarMsgStream(ctx, 1024)
ddStream.SetPulsarClient(Params.PulsarAddress)
ddStream.CreatePulsarConsumers([]string{s.ddChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024)
ddStream.Start()
defer ddStream.Close()
for {
select {
case <-ctx.Done():
log.Println("dd channel shut down")
return
default:
}
msgPack := ddStream.Consume()
for _, msg := range msgPack.Msgs {
if err := s.ddHandler.HandleDDMsg(msg); err != nil {
log.Println(err.Error())
continue
}
}
}
}
func (s *Server) waitDataNodeRegister() {
log.Println("waiting data node to register")
<-s.registerFinishCh
......@@ -512,8 +545,8 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kSegmentInfo,
MsgID: 0,
Timestamp: 0, // todo
SourceID: 0,
Timestamp: 0,
SourceID: Params.NodeID,
},
Segment: segmentInfo,
},
......
......@@ -35,6 +35,14 @@ func NewGrpcService(ctx context.Context) *Service {
log.Fatalf("create server error: %s", err.Error())
return nil
}
return s
}
func (s *Service) SetMasterClient(masterClient dataservice.MasterClient) {
s.server.SetMasterClient(masterClient)
}
func (s *Service) Init() error {
s.grpcServer = grpc.NewServer()
datapb.RegisterDataServiceServer(s.grpcServer, s)
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", dataservice.Params.Address, dataservice.Params.Port))
......@@ -46,14 +54,6 @@ func NewGrpcService(ctx context.Context) *Service {
log.Fatal(err.Error())
return nil
}
return s
}
func (s *Service) SetMasterClient(masterClient dataservice.MasterClient) {
s.server.SetMasterClient(masterClient)
}
func (s *Service) Init() error {
return s.server.Init()
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册