提交 82d6fb18 编写于 作者: S sunby 提交者: yefu.chen

Add init params returned to data node

Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
上级 b794921b
......@@ -39,3 +39,64 @@ func TestCollection(t *testing.T) {
_, err = meta.GetCollection(id)
assert.NotNil(t, err)
}
func TestSegment(t *testing.T) {
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
id, err := mockAllocator.allocID()
assert.Nil(t, err)
segmentInfo, err := meta.BuildSegment(id, 100, []string{"c1", "c2"})
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
info, err := meta.GetSegment(segmentInfo.SegmentID)
assert.Nil(t, err)
assert.EqualValues(t, segmentInfo, info)
ids := meta.GetSegmentsByCollectionID(id)
assert.EqualValues(t, 1, len(ids))
assert.EqualValues(t, segmentInfo.SegmentID, ids[0])
ids = meta.GetSegmentsByCollectionAndPartitionID(id, 100)
assert.EqualValues(t, 1, len(ids))
assert.EqualValues(t, segmentInfo.SegmentID, ids[0])
err = meta.SealSegment(segmentInfo.SegmentID)
assert.Nil(t, err)
err = meta.FlushSegment(segmentInfo.SegmentID)
assert.Nil(t, err)
info, err = meta.GetSegment(segmentInfo.SegmentID)
assert.Nil(t, err)
assert.NotZero(t, info.SealedTime)
assert.NotZero(t, info.FlushedTime)
}
func TestPartition(t *testing.T) {
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
testSchema := newTestSchema()
id, err := mockAllocator.allocID()
assert.Nil(t, err)
err = meta.AddPartition(id, 10)
assert.NotNil(t, err)
err = meta.AddCollection(&collectionInfo{
ID: id,
Schema: testSchema,
Partitions: []UniqueID{},
})
assert.Nil(t, err)
err = meta.AddPartition(id, 10)
assert.Nil(t, err)
err = meta.AddPartition(id, 10)
assert.NotNil(t, err)
collection, err := meta.GetCollection(id)
assert.Nil(t, err)
assert.EqualValues(t, 10, collection.Partitions[0])
err = meta.DropPartition(id, 10)
assert.Nil(t, err)
collection, err = meta.GetCollection(id)
assert.Nil(t, err)
assert.EqualValues(t, 0, len(collection.Partitions))
err = meta.DropPartition(id, 10)
assert.NotNil(t, err)
}
......@@ -29,6 +29,7 @@ type ParamTable struct {
StatisticsChannelName string
TimeTickChannelName string
DataNodeNum int
SegmentChannelName string // todo init
}
var Params ParamTable
......
......@@ -4,7 +4,7 @@ import (
"context"
"fmt"
"log"
"time"
"sync"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
......@@ -49,6 +49,9 @@ type (
Timestamp = typeutil.Timestamp
Server struct {
ctx context.Context
serverLoopCtx context.Context
serverLoopCancel context.CancelFunc
serverLoopWg sync.WaitGroup
state internalpb2.StateCode
client *etcdkv.EtcdKV
meta *meta
......@@ -61,10 +64,11 @@ type (
registerFinishCh chan struct{}
masterClient *masterservice.GrpcClient
ttMsgStream msgstream.MsgStream
ddChannelName string
}
)
func CreateServer(ctx context.Context) (*Server, error) {
func CreateServer(ctx context.Context, client *masterservice.GrpcClient) (*Server, error) {
ch := make(chan struct{})
return &Server{
ctx: ctx,
......@@ -72,6 +76,7 @@ func CreateServer(ctx context.Context) (*Server, error) {
insertChannelMgr: newInsertChannelManager(),
registerFinishCh: ch,
cluster: newDataNodeCluster(ch),
masterClient: client,
}, nil
}
......@@ -81,9 +86,6 @@ func (s *Server) Init() error {
}
func (s *Server) Start() error {
if err := s.connectMaster(); err != nil {
return err
}
s.allocator = newAllocatorImpl(s.masterClient)
if err := s.initMeta(); err != nil {
return err
......@@ -95,34 +97,20 @@ func (s *Server) Start() error {
}
s.segAllocator = segAllocator
s.waitDataNodeRegister()
if err = s.loadMetaFromMaster(); err != nil {
return err
}
if err = s.initMsgProducer(); err != nil {
return err
}
s.startServerLoop()
s.state = internalpb2.StateCode_HEALTHY
log.Println("start success")
return nil
}
func (s *Server) connectMaster() error {
log.Println("connecting to master")
master, err := masterservice.NewGrpcClient(Params.MasterAddress, 30*time.Second)
if err != nil {
return err
}
if err = master.Init(nil); err != nil {
return err
}
if err = master.Start(); err != nil {
return err
}
s.masterClient = master
log.Println("connect to master success")
return nil
}
func (s *Server) initMeta() error {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
if err != nil {
......@@ -144,7 +132,6 @@ func (s *Server) waitDataNodeRegister() {
}
func (s *Server) initMsgProducer() error {
// todo ttstream and peerids
s.ttMsgStream = pulsarms.NewPulsarTtMsgStream(s.ctx, 1024)
s.ttMsgStream.Start()
timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs())
......@@ -157,6 +144,37 @@ func (s *Server) initMsgProducer() error {
s.msgProducer.Start(s.ctx)
return nil
}
func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(1)
go s.startStatsChannel(s.serverLoopCtx)
}
func (s *Server) startStatsChannel(ctx context.Context) {
defer s.serverLoopWg.Done()
statsStream := pulsarms.NewPulsarMsgStream(ctx, 1024)
statsStream.Start()
defer statsStream.Close()
for {
select {
case <-ctx.Done():
return
default:
}
msgPack := statsStream.Consume()
for _, msg := range msgPack.Msgs {
statistics := msg.(*msgstream.SegmentStatisticsMsg)
for _, stat := range statistics.SegStats {
if err := s.statsHandler.HandleSegmentStat(stat); err != nil {
log.Println(err.Error())
continue
}
}
}
}
}
func (s *Server) loadMetaFromMaster() error {
log.Println("loading collection meta from master")
collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{
......@@ -218,9 +236,15 @@ func (s *Server) loadMetaFromMaster() error {
func (s *Server) Stop() error {
s.ttMsgStream.Close()
s.msgProducer.Close()
s.stopServerLoop()
return nil
}
func (s *Server) stopServerLoop() {
s.serverLoopCancel()
s.serverLoopWg.Wait()
}
func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) {
resp := &internalpb2.ComponentStates{
State: &internalpb2.ComponentInfo{
......@@ -261,13 +285,31 @@ func (s *Server) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
}
func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
s.cluster.Register(req.Address.Ip, req.Address.Port, req.Base.SourceID)
// add init params
return &datapb.RegisterNodeResponse{
ret := &datapb.RegisterNodeResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}, nil
}
s.cluster.Register(req.Address.Ip, req.Address.Port, req.Base.SourceID)
if len(s.ddChannelName) == 0 {
resp, err := s.masterClient.GetDdChannel(nil)
if err != nil {
ret.Status.Reason = err.Error()
return ret, err
}
s.ddChannelName = resp.Value
}
ret.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
ret.InitParams = &internalpb2.InitParams{
NodeID: Params.NodeID,
StartParams: []*commonpb.KeyValuePair{
{Key: "DDChannelName", Value: s.ddChannelName},
{Key: "SegmentStatisticsChannelName", Value: Params.StatisticsChannelName},
{Key: "TimeTickChannelName", Value: Params.TimeTickChannelName},
{Key: "CompleteFlushChannelName", Value: Params.SegmentChannelName},
},
}
return ret, nil
}
func (s *Server) Flush(req *datapb.FlushRequest) (*commonpb.Status, error) {
......
package dataservice
import (
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
......@@ -16,28 +14,7 @@ func newStatsHandler(meta *meta) *statsHandler {
}
}
func (handler *statsHandler) HandleQueryNodeStats(msgPack *msgstream.MsgPack) error {
for _, msg := range msgPack.Msgs {
statsMsg, ok := msg.(*msgstream.QueryNodeStatsMsg)
if !ok {
return errors.Errorf("Type of message is not QueryNodeSegStatsMsg")
}
for _, segStat := range statsMsg.GetSegStats() {
if err := handler.handleSegmentStat(segStat); err != nil {
return err
}
}
}
return nil
}
func (handler *statsHandler) handleSegmentStat(segStats *internalpb2.SegmentStats) error {
if !segStats.GetRecentlyModified() {
return nil
}
func (handler *statsHandler) HandleSegmentStat(segStats *internalpb2.SegmentStatisticsUpdates) error {
segMeta, err := handler.meta.GetSegment(segStats.SegmentID)
if err != nil {
return err
......
......@@ -4,6 +4,9 @@ import (
"context"
"log"
"net"
"time"
"github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
"google.golang.org/grpc"
......@@ -17,17 +20,21 @@ import (
)
type Service struct {
server *dataservice.Server
ctx context.Context
cancel context.CancelFunc
grpcServer *grpc.Server
server *dataservice.Server
ctx context.Context
cancel context.CancelFunc
grpcServer *grpc.Server
masterClient *masterservice.GrpcClient
}
func NewGrpcService() {
s := &Service{}
var err error
s.ctx, s.cancel = context.WithCancel(context.Background())
s.server, err = dataservice.CreateServer(s.ctx)
if err = s.connectMaster(); err != nil {
log.Fatal("connect to master" + err.Error())
}
s.server, err = dataservice.CreateServer(s.ctx, s.masterClient)
if err != nil {
log.Fatalf("create server error: %s", err.Error())
return
......@@ -45,6 +52,22 @@ func NewGrpcService() {
}
}
func (s *Service) connectMaster() error {
log.Println("connecting to master")
master, err := masterservice.NewGrpcClient("localhost:10101", 30*time.Second) // todo address
if err != nil {
return err
}
if err = master.Init(nil); err != nil {
return err
}
if err = master.Start(); err != nil {
return err
}
s.masterClient = master
log.Println("connect to master success")
return nil
}
func (s *Service) Init() error {
return s.server.Init()
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册