提交 5ac463c9 编写于 作者: S sunby 提交者: yefu.chen

Add datanode client in data service

Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
上级 4acaa970
......@@ -51,17 +51,7 @@ type (
}
)
func newReplica() collectionReplica {
collections := make([]*Collection, 0)
segments := make([]*Segment, 0)
var replica collectionReplica = &collectionReplicaImpl{
collections: collections,
segments: segments,
}
return replica
}
//----------------------------------------------------------------------------------------------------- collection
func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
......@@ -71,7 +61,7 @@ func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Se
return segment, nil
}
}
return nil, errors.Errorf("Cannot find segment, id = %v", segmentID)
return nil, errors.Errorf("cannot find segment, id = %v", segmentID)
}
func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, collID UniqueID,
......@@ -173,7 +163,7 @@ func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, sc
var newCollection = newCollection(collectionID, schema)
colReplica.collections = append(colReplica.collections, newCollection)
log.Println("Create collection:", newCollection.Name())
log.Println("Create collection: ", newCollection.Name())
return nil
}
......@@ -187,25 +177,25 @@ func (colReplica *collectionReplicaImpl) getCollectionIDByName(collName string)
return collection.ID(), nil
}
}
return 0, errors.Errorf("Cannot get collection ID by name %s: not exist", collName)
return 0, errors.Errorf("There is no collection name=%v", collName)
}
func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error {
// GOOSE TODO: optimize
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
length := len(colReplica.collections)
for index, col := range colReplica.collections {
if col.ID() == collectionID {
log.Println("Drop collection: ", col.Name())
colReplica.collections[index] = colReplica.collections[length-1]
colReplica.collections = colReplica.collections[:length-1]
return nil
tmpCollections := make([]*Collection, 0)
for _, col := range colReplica.collections {
if col.ID() != collectionID {
tmpCollections = append(tmpCollections, col)
} else {
log.Println("Drop collection : ", col.Name())
}
}
return errors.Errorf("Cannot remove collection %d: not exist", collectionID)
colReplica.collections = tmpCollections
return nil
}
func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) {
......@@ -217,7 +207,7 @@ func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID
return collection, nil
}
}
return nil, errors.Errorf("Cannot get collection %d by ID: not exist", collectionID)
return nil, errors.Errorf("cannot find collection, id = %v", collectionID)
}
func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName string) (*Collection, error) {
......
......@@ -5,11 +5,20 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
func newReplica() collectionReplica {
collections := make([]*Collection, 0)
segments := make([]*Segment, 0)
var replica collectionReplica = &collectionReplicaImpl{
collections: collections,
segments: segments,
}
return replica
}
func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName string, collectionID UniqueID, segmentID UniqueID) {
// GOOSE TODO remove
Factory := &MetaFactory{}
collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName)
......@@ -24,133 +33,71 @@ func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName
}
func TestReplica_Collection(t *testing.T) {
Factory := &MetaFactory{}
collMetaMock := Factory.CollectionMetaFactory(0, "collection0")
t.Run("Test add collection", func(t *testing.T) {
replica := newReplica()
assert.False(t, replica.hasCollection(0))
num := replica.getCollectionNum()
assert.Equal(t, 0, num)
err := replica.addCollection(0, collMetaMock.GetSchema())
assert.NoError(t, err)
assert.True(t, replica.hasCollection(0))
num = replica.getCollectionNum()
assert.Equal(t, 1, num)
coll, err := replica.getCollectionByID(0)
assert.NoError(t, err)
assert.NotNil(t, coll)
assert.Equal(t, UniqueID(0), coll.ID())
assert.Equal(t, "collection0", coll.Name())
assert.Equal(t, collMetaMock.GetSchema(), coll.Schema())
coll, err = replica.getCollectionByName("collection0")
assert.NoError(t, err)
assert.NotNil(t, coll)
assert.Equal(t, UniqueID(0), coll.ID())
assert.Equal(t, "collection0", coll.Name())
assert.Equal(t, collMetaMock.GetSchema(), coll.Schema())
collID, err := replica.getCollectionIDByName("collection0")
assert.NoError(t, err)
assert.Equal(t, UniqueID(0), collID)
})
t.Run("Test remove collection", func(t *testing.T) {
replica := newReplica()
err := replica.addCollection(0, collMetaMock.GetSchema())
require.NoError(t, err)
numsBefore := replica.getCollectionNum()
coll, err := replica.getCollectionByID(0)
require.NotNil(t, coll)
require.NoError(t, err)
err = replica.removeCollection(0)
assert.NoError(t, err)
numsAfter := replica.getCollectionNum()
assert.Equal(t, 1, numsBefore-numsAfter)
coll, err = replica.getCollectionByID(0)
assert.Nil(t, coll)
assert.Error(t, err)
err = replica.removeCollection(999999999)
assert.Error(t, err)
})
t.Run("Test errors", func(t *testing.T) {
replica := newReplica()
require.False(t, replica.hasCollection(0))
require.Equal(t, 0, replica.getCollectionNum())
coll, err := replica.getCollectionByName("Name-not-exist")
assert.Error(t, err)
assert.Nil(t, coll)
coll, err = replica.getCollectionByID(0)
assert.Error(t, err)
assert.Nil(t, coll)
collID, err := replica.getCollectionIDByName("Name-not-exist")
assert.Error(t, err)
assert.Zero(t, collID)
err = replica.removeCollection(0)
assert.Error(t, err)
})
//----------------------------------------------------------------------------------------------------- collection
func TestCollectionReplica_getCollectionNum(t *testing.T) {
replica := newReplica()
initTestReplicaMeta(t, replica, "collection0", 0, 0)
assert.Equal(t, replica.getCollectionNum(), 1)
}
func TestCollectionReplica_addCollection(t *testing.T) {
replica := newReplica()
initTestReplicaMeta(t, replica, "collection0", 0, 0)
}
func TestReplica_Segment(t *testing.T) {
t.Run("Test segment", func(t *testing.T) {
replica := newReplica()
assert.False(t, replica.hasSegment(0))
func TestCollectionReplica_removeCollection(t *testing.T) {
replica := newReplica()
initTestReplicaMeta(t, replica, "collection0", 0, 0)
assert.Equal(t, replica.getCollectionNum(), 1)
err := replica.addSegment(0, 1, 2, make([]*internalpb2.MsgPosition, 0))
assert.NoError(t, err)
assert.True(t, replica.hasSegment(0))
err := replica.removeCollection(0)
assert.NoError(t, err)
assert.Equal(t, replica.getCollectionNum(), 0)
}
seg, err := replica.getSegmentByID(0)
assert.NoError(t, err)
assert.NotNil(t, seg)
assert.Equal(t, UniqueID(1), seg.collectionID)
assert.Equal(t, UniqueID(2), seg.partitionID)
func TestCollectionReplica_getCollectionByID(t *testing.T) {
replica := newReplica()
collectionName := "collection0"
collectionID := UniqueID(0)
initTestReplicaMeta(t, replica, collectionName, collectionID, 0)
targetCollection, err := replica.getCollectionByID(collectionID)
assert.NoError(t, err)
assert.NotNil(t, targetCollection)
assert.Equal(t, targetCollection.Name(), collectionName)
assert.Equal(t, targetCollection.ID(), collectionID)
}
assert.Equal(t, int64(0), seg.numRows)
func TestCollectionReplica_getCollectionByName(t *testing.T) {
replica := newReplica()
collectionName := "collection0"
collectionID := UniqueID(0)
initTestReplicaMeta(t, replica, collectionName, collectionID, 0)
err = replica.updateStatistics(0, 100)
assert.NoError(t, err)
assert.Equal(t, int64(100), seg.numRows)
targetCollection, err := replica.getCollectionByName(collectionName)
assert.NoError(t, err)
assert.NotNil(t, targetCollection)
assert.Equal(t, targetCollection.Name(), collectionName)
assert.Equal(t, targetCollection.ID(), collectionID)
update, err := replica.getSegmentStatisticsUpdates(0)
assert.NoError(t, err)
assert.Equal(t, UniqueID(0), update.SegmentID)
assert.Equal(t, int64(100), update.NumRows)
assert.True(t, update.IsNewSegment)
})
}
t.Run("Test errors", func(t *testing.T) {
replica := newReplica()
require.False(t, replica.hasSegment(0))
func TestCollectionReplica_hasCollection(t *testing.T) {
replica := newReplica()
collectionName := "collection0"
collectionID := UniqueID(0)
initTestReplicaMeta(t, replica, collectionName, collectionID, 0)
seg, err := replica.getSegmentByID(0)
assert.Error(t, err)
assert.Nil(t, seg)
hasCollection := replica.hasCollection(collectionID)
assert.Equal(t, hasCollection, true)
hasCollection = replica.hasCollection(UniqueID(1))
assert.Equal(t, hasCollection, false)
err = replica.removeSegment(0)
assert.Error(t, err)
}
err = replica.updateStatistics(0, 0)
assert.Error(t, err)
func TestCollectionReplica_freeAll(t *testing.T) {
replica := newReplica()
collectionName := "collection0"
collectionID := UniqueID(0)
initTestReplicaMeta(t, replica, collectionName, collectionID, 0)
update, err := replica.getSegmentStatisticsUpdates(0)
assert.Error(t, err)
assert.Nil(t, update)
})
}
......@@ -16,7 +16,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
const (
......@@ -39,6 +38,7 @@ type (
FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error)
SetMasterServiceInterface(ms MasterServiceInterface) error
SetDataServiceInterface(ds DataServiceInterface) error
}
......@@ -55,6 +55,7 @@ type (
}
DataNode struct {
// GOOSE TODO: complete interface with component
ctx context.Context
NodeID UniqueID
Role string
......@@ -79,8 +80,8 @@ func NewDataNode(ctx context.Context) *DataNode {
Params.Init()
node := &DataNode{
ctx: ctx,
NodeID: Params.NodeID, // GOOSE TODO How to init
Role: typeutil.DataNodeRole,
NodeID: Params.NodeID, // GOOSE TODO
Role: "DataNode", // GOOSE TODO
State: internalpb2.StateCode_INITIALIZING,
dataSyncService: nil,
metaService: nil,
......@@ -106,7 +107,7 @@ func (node *DataNode) Init() error {
req := &datapb.RegisterNodeRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kNone,
MsgType: commonpb.MsgType_kNone, //GOOSE TODO
SourceID: node.NodeID,
},
Address: &commonpb.Address{
......@@ -117,10 +118,11 @@ func (node *DataNode) Init() error {
resp, err := node.dataService.RegisterNode(req)
if err != nil {
return errors.Errorf("Register node failed: %v", err)
return errors.Errorf("Init failed: %v", err)
}
for _, kv := range resp.InitParams.StartParams {
log.Println(kv)
switch kv.Key {
case "DDChannelName":
Params.DDChannelNames = []string{kv.Value}
......@@ -148,7 +150,7 @@ func (node *DataNode) Init() error {
node.metaService = newMetaService(node.ctx, replica, node.masterService)
node.replica = replica
// --- Opentracing ---
// Opentracing
cfg := &config.Configuration{
ServiceName: "data_node",
Sampler: &config.SamplerConfig{
......@@ -165,6 +167,7 @@ func (node *DataNode) Init() error {
}
node.tracer = tracer
node.closer = closer
opentracing.SetGlobalTracer(node.tracer)
return nil
......@@ -180,14 +183,12 @@ func (node *DataNode) Start() error {
}
func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
log.Println("Init insert channel names:", in.GetChannelNames())
Params.InsertChannelNames = append(Params.InsertChannelNames, in.GetChannelNames()...)
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, nil
}
func (node *DataNode) GetComponentStates() (*internalpb2.ComponentStates, error) {
log.Println("DataNode current state:", node.State)
states := &internalpb2.ComponentStates{
State: &internalpb2.ComponentInfo{
NodeID: Params.NodeID,
......
......@@ -17,7 +17,6 @@ type (
}
AllocatorFactory struct {
ID UniqueID
}
MasterServiceFactory struct {
......@@ -162,23 +161,9 @@ func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionNa
return &collection
}
func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory {
f := &AllocatorFactory{}
if len(id) == 1 {
f.ID = id[0]
}
return f
}
func (alloc AllocatorFactory) setID(id UniqueID) {
alloc.ID = id
}
func (alloc AllocatorFactory) allocID() (UniqueID, error) {
if alloc.ID == 0 {
return UniqueID(0), nil // GOOSE TODO: random ID generating
}
return alloc.ID, nil
// GOOSE TODO: random ID generate
return UniqueID(0), nil
}
func (m *MasterServiceFactory) setID(id UniqueID) {
......
......@@ -112,7 +112,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
case commonpb.MsgType_kDropPartition:
ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg))
default:
log.Println("Not supporting message type:", msg.Type())
log.Println("Non supporting message type:", msg.Type())
}
}
......@@ -132,7 +132,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
}
default:
log.Println(". default: do nothing ...")
log.Println("..........default do nothing")
}
// generate binlog
......@@ -303,8 +303,8 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) {
}
ddNode.ddRecords.partitionRecords[partitionID] = nil
partitionName := msg.PartitionName
ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName],
partitionTag := msg.PartitionName
ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag],
metaOperateRecord{
createOrDrop: true,
timestamp: msg.Base.Timestamp,
......@@ -341,8 +341,8 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
}
delete(ddNode.ddRecords.partitionRecords, partitionID)
partitionName := msg.PartitionName
ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName],
partitionTag := msg.PartitionName
ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag],
metaOperateRecord{
createOrDrop: false,
timestamp: msg.Base.Timestamp,
......
......@@ -2,7 +2,6 @@ package datanode
import (
"context"
"log"
"testing"
"time"
......@@ -36,57 +35,51 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
require.NoError(t, err)
Params.MetaRootPath = testPath
// Params.FlushDdBufferSize = 4
Params.FlushDdBufferSize = 4
replica := newReplica()
allocatorMock := NewAllocatorFactory()
ddNode := newDDNode(ctx, newMetaTable(), inFlushCh, replica, allocatorMock)
log.Print()
idFactory := AllocatorFactory{}
ddNode := newDDNode(ctx, newMetaTable(), inFlushCh, replica, idFactory)
collID := UniqueID(0)
collName := "col-test-0"
colID := UniqueID(0)
colName := "col-test-0"
// create collection
createCollReq := internalpb2.CreateCollectionRequest{
createColReq := internalpb2.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kCreateCollection,
MsgID: 1,
Timestamp: 1,
SourceID: 1,
},
CollectionID: collID,
Schema: make([]byte, 0),
CollectionName: collName,
DbName: "DbName",
DbID: UniqueID(0),
CollectionID: colID,
Schema: make([]byte, 0),
}
createCollMsg := msgstream.CreateCollectionMsg{
createColMsg := msgstream.CreateCollectionMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: Timestamp(1),
EndTimestamp: Timestamp(1),
HashValues: []uint32{uint32(0)},
},
CreateCollectionRequest: createCollReq,
CreateCollectionRequest: createColReq,
}
// drop collection
dropCollReq := internalpb2.DropCollectionRequest{
dropColReq := internalpb2.DropCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDropCollection,
MsgID: 2,
Timestamp: 2,
SourceID: 2,
},
CollectionID: collID,
CollectionName: collName,
DbName: "DbName",
DbID: UniqueID(0),
CollectionID: colID,
CollectionName: colName,
}
dropCollMsg := msgstream.DropCollectionMsg{
dropColMsg := msgstream.DropCollectionMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: Timestamp(2),
EndTimestamp: Timestamp(2),
HashValues: []uint32{uint32(0)},
},
DropCollectionRequest: dropCollReq,
DropCollectionRequest: dropColReq,
}
partitionID := UniqueID(100)
......@@ -99,12 +92,10 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
Timestamp: 3,
SourceID: 3,
},
CollectionID: collID,
CollectionID: colID,
PartitionID: partitionID,
CollectionName: collName,
CollectionName: colName,
PartitionName: partitionName,
DbName: "DbName",
DbID: UniqueID(0),
}
createPartitionMsg := msgstream.CreatePartitionMsg{
BaseMsg: msgstream.BaseMsg{
......@@ -123,12 +114,10 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
Timestamp: 4,
SourceID: 4,
},
CollectionID: collID,
CollectionID: colID,
PartitionID: partitionID,
CollectionName: collName,
CollectionName: colName,
PartitionName: partitionName,
DbName: "DbName",
DbID: UniqueID(0),
}
dropPartitionMsg := msgstream.DropPartitionMsg{
BaseMsg: msgstream.BaseMsg{
......@@ -139,17 +128,16 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
DropPartitionRequest: dropPartitionReq,
}
replica.addSegment(1, collID, partitionID, make([]*internalpb2.MsgPosition, 0))
inFlushCh <- &flushMsg{
msgID: 5,
timestamp: 5,
msgID: 1,
timestamp: 6,
segmentIDs: []UniqueID{1},
collectionID: collID,
collectionID: UniqueID(1),
}
tsMessages := make([]msgstream.TsMsg, 0)
tsMessages = append(tsMessages, msgstream.TsMsg(&createCollMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&dropCollMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&createColMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&dropColMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&createPartitionMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&dropPartitionMsg))
msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3), make([]*internalpb2.MsgPosition, 0))
......
......@@ -27,7 +27,6 @@ func newMetaService(ctx context.Context, replica collectionReplica, m MasterServ
}
func (mService *metaService) init() {
log.Println("Initing meta ...")
err := mService.loadCollections()
if err != nil {
log.Fatal("metaService init failed:", err)
......
......@@ -10,8 +10,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/distributed/datanode"
)
type (
......@@ -21,7 +19,7 @@ type (
ip string
port int64
}
client *datanode.Client
client DataNodeClient
channelNum int
}
dataNodeCluster struct {
......@@ -38,18 +36,11 @@ func newDataNodeCluster(finishCh chan struct{}) *dataNodeCluster {
}
}
func (c *dataNodeCluster) Register(ip string, port int64, id int64) {
func (c *dataNodeCluster) Register(dataNode *dataNode) {
c.mu.Lock()
defer c.mu.Unlock()
if c.checkDataNodeNotExist(ip, port) {
c.nodes = append(c.nodes, &dataNode{
id: id,
address: struct {
ip string
port int64
}{ip: ip, port: port},
channelNum: 0,
})
if c.checkDataNodeNotExist(dataNode.address.ip, dataNode.address.port) {
c.nodes = append(c.nodes, dataNode)
if len(c.nodes) == Params.DataNodeNum {
close(c.finishCh)
}
......@@ -125,3 +116,12 @@ func (c *dataNodeCluster) FlushSegment(request *datapb.FlushSegRequest) {
}
}
}
func (c *dataNodeCluster) ShutDownClients() {
for _, node := range c.nodes {
if err := node.client.Stop(); err != nil {
log.Println(err.Error())
continue
}
}
}
......@@ -11,6 +11,8 @@ import (
"sync/atomic"
"time"
"github.com/zilliztech/milvus-distributed/internal/distributed/datanode"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
......@@ -61,6 +63,13 @@ type MasterClient interface {
GetComponentStates() (*internalpb2.ComponentStates, error)
}
type DataNodeClient interface {
WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error)
GetComponentStates(empty *commonpb.Empty) (*internalpb2.ComponentStates, error)
FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error)
Stop() error
}
type (
UniqueID = typeutil.UniqueID
Timestamp = typeutil.Timestamp
......@@ -380,6 +389,7 @@ func (s *Server) waitDataNodeRegister() {
}
func (s *Server) Stop() error {
s.cluster.ShutDownClients()
s.ttMsgStream.Close()
s.k2sMsgStream.Close()
s.msgProducer.Close()
......@@ -428,7 +438,11 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
s.cluster.Register(req.Address.Ip, req.Address.Port, req.Base.SourceID)
node, err := s.newDataNode(req.Address.Ip, req.Address.Port, req.Base.SourceID)
if err != nil {
return nil, err
}
s.cluster.Register(node)
if s.ddChannelName == "" {
resp, err := s.masterClient.GetDdChannel()
if err != nil {
......@@ -450,6 +464,25 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register
return ret, nil
}
func (s *Server) newDataNode(ip string, port int64, id UniqueID) (*dataNode, error) {
client := datanode.NewClient(fmt.Sprintf("%s:%d", ip, port))
if err := client.Init(); err != nil {
return nil, err
}
if err := client.Start(); err != nil {
return nil, err
}
return &dataNode{
id: id,
address: struct {
ip string
port int64
}{ip: ip, port: port},
client: client,
channelNum: 0,
}, nil
}
func (s *Server) Flush(req *datapb.FlushRequest) (*commonpb.Status, error) {
if !s.checkStateIsHealthy() {
return &commonpb.Status{
......
......@@ -7,6 +7,7 @@ import (
"sync"
dn "github.com/zilliztech/milvus-distributed/internal/datanode"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
......@@ -66,6 +67,11 @@ func (s *Server) SetDataServiceInterface(ds dn.DataServiceInterface) error {
}
func (s *Server) Init() error {
err := s.core.Init()
if err != nil {
return errors.Errorf("Init failed: %v", err)
}
return s.core.Init()
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册