提交 4acaa970 编写于 作者: X XuanYang-cn 提交者: yefu.chen

Add unittests for datanode and queryservice

Signed-off-by: NXuanYang-cn <xuan.yang@zilliz.com>
上级 f1afd5d3
...@@ -51,7 +51,17 @@ type ( ...@@ -51,7 +51,17 @@ type (
} }
) )
//----------------------------------------------------------------------------------------------------- collection func newReplica() collectionReplica {
collections := make([]*Collection, 0)
segments := make([]*Segment, 0)
var replica collectionReplica = &collectionReplicaImpl{
collections: collections,
segments: segments,
}
return replica
}
func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) { func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) {
colReplica.mu.RLock() colReplica.mu.RLock()
defer colReplica.mu.RUnlock() defer colReplica.mu.RUnlock()
...@@ -61,7 +71,7 @@ func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Se ...@@ -61,7 +71,7 @@ func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Se
return segment, nil return segment, nil
} }
} }
return nil, errors.Errorf("cannot find segment, id = %v", segmentID) return nil, errors.Errorf("Cannot find segment, id = %v", segmentID)
} }
func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, collID UniqueID, func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, collID UniqueID,
...@@ -163,7 +173,7 @@ func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, sc ...@@ -163,7 +173,7 @@ func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, sc
var newCollection = newCollection(collectionID, schema) var newCollection = newCollection(collectionID, schema)
colReplica.collections = append(colReplica.collections, newCollection) colReplica.collections = append(colReplica.collections, newCollection)
log.Println("Create collection: ", newCollection.Name()) log.Println("Create collection:", newCollection.Name())
return nil return nil
} }
...@@ -177,25 +187,25 @@ func (colReplica *collectionReplicaImpl) getCollectionIDByName(collName string) ...@@ -177,25 +187,25 @@ func (colReplica *collectionReplicaImpl) getCollectionIDByName(collName string)
return collection.ID(), nil return collection.ID(), nil
} }
} }
return 0, errors.Errorf("There is no collection name=%v", collName) return 0, errors.Errorf("Cannot get collection ID by name %s: not exist", collName)
} }
func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error { func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error {
// GOOSE TODO: optimize
colReplica.mu.Lock() colReplica.mu.Lock()
defer colReplica.mu.Unlock() defer colReplica.mu.Unlock()
tmpCollections := make([]*Collection, 0) length := len(colReplica.collections)
for _, col := range colReplica.collections { for index, col := range colReplica.collections {
if col.ID() != collectionID { if col.ID() == collectionID {
tmpCollections = append(tmpCollections, col) log.Println("Drop collection: ", col.Name())
} else { colReplica.collections[index] = colReplica.collections[length-1]
log.Println("Drop collection : ", col.Name()) colReplica.collections = colReplica.collections[:length-1]
return nil
} }
} }
colReplica.collections = tmpCollections
return nil return errors.Errorf("Cannot remove collection %d: not exist", collectionID)
} }
func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) { func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) {
...@@ -207,7 +217,7 @@ func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID ...@@ -207,7 +217,7 @@ func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID
return collection, nil return collection, nil
} }
} }
return nil, errors.Errorf("cannot find collection, id = %v", collectionID) return nil, errors.Errorf("Cannot get collection %d by ID: not exist", collectionID)
} }
func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName string) (*Collection, error) { func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName string) (*Collection, error) {
......
...@@ -5,20 +5,11 @@ import ( ...@@ -5,20 +5,11 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
) )
func newReplica() collectionReplica {
collections := make([]*Collection, 0)
segments := make([]*Segment, 0)
var replica collectionReplica = &collectionReplicaImpl{
collections: collections,
segments: segments,
}
return replica
}
func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName string, collectionID UniqueID, segmentID UniqueID) { func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName string, collectionID UniqueID, segmentID UniqueID) {
// GOOSE TODO remove
Factory := &MetaFactory{} Factory := &MetaFactory{}
collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName) collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName)
...@@ -33,71 +24,133 @@ func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName ...@@ -33,71 +24,133 @@ func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName
} }
//----------------------------------------------------------------------------------------------------- collection func TestReplica_Collection(t *testing.T) {
func TestCollectionReplica_getCollectionNum(t *testing.T) { Factory := &MetaFactory{}
replica := newReplica() collMetaMock := Factory.CollectionMetaFactory(0, "collection0")
initTestReplicaMeta(t, replica, "collection0", 0, 0)
assert.Equal(t, replica.getCollectionNum(), 1) t.Run("Test add collection", func(t *testing.T) {
}
replica := newReplica()
assert.False(t, replica.hasCollection(0))
num := replica.getCollectionNum()
assert.Equal(t, 0, num)
err := replica.addCollection(0, collMetaMock.GetSchema())
assert.NoError(t, err)
assert.True(t, replica.hasCollection(0))
num = replica.getCollectionNum()
assert.Equal(t, 1, num)
coll, err := replica.getCollectionByID(0)
assert.NoError(t, err)
assert.NotNil(t, coll)
assert.Equal(t, UniqueID(0), coll.ID())
assert.Equal(t, "collection0", coll.Name())
assert.Equal(t, collMetaMock.GetSchema(), coll.Schema())
coll, err = replica.getCollectionByName("collection0")
assert.NoError(t, err)
assert.NotNil(t, coll)
assert.Equal(t, UniqueID(0), coll.ID())
assert.Equal(t, "collection0", coll.Name())
assert.Equal(t, collMetaMock.GetSchema(), coll.Schema())
collID, err := replica.getCollectionIDByName("collection0")
assert.NoError(t, err)
assert.Equal(t, UniqueID(0), collID)
})
t.Run("Test remove collection", func(t *testing.T) {
replica := newReplica()
err := replica.addCollection(0, collMetaMock.GetSchema())
require.NoError(t, err)
numsBefore := replica.getCollectionNum()
coll, err := replica.getCollectionByID(0)
require.NotNil(t, coll)
require.NoError(t, err)
err = replica.removeCollection(0)
assert.NoError(t, err)
numsAfter := replica.getCollectionNum()
assert.Equal(t, 1, numsBefore-numsAfter)
coll, err = replica.getCollectionByID(0)
assert.Nil(t, coll)
assert.Error(t, err)
err = replica.removeCollection(999999999)
assert.Error(t, err)
})
t.Run("Test errors", func(t *testing.T) {
replica := newReplica()
require.False(t, replica.hasCollection(0))
require.Equal(t, 0, replica.getCollectionNum())
coll, err := replica.getCollectionByName("Name-not-exist")
assert.Error(t, err)
assert.Nil(t, coll)
coll, err = replica.getCollectionByID(0)
assert.Error(t, err)
assert.Nil(t, coll)
collID, err := replica.getCollectionIDByName("Name-not-exist")
assert.Error(t, err)
assert.Zero(t, collID)
err = replica.removeCollection(0)
assert.Error(t, err)
})
func TestCollectionReplica_addCollection(t *testing.T) {
replica := newReplica()
initTestReplicaMeta(t, replica, "collection0", 0, 0)
} }
func TestCollectionReplica_removeCollection(t *testing.T) { func TestReplica_Segment(t *testing.T) {
replica := newReplica() t.Run("Test segment", func(t *testing.T) {
initTestReplicaMeta(t, replica, "collection0", 0, 0) replica := newReplica()
assert.Equal(t, replica.getCollectionNum(), 1) assert.False(t, replica.hasSegment(0))
err := replica.removeCollection(0) err := replica.addSegment(0, 1, 2, make([]*internalpb2.MsgPosition, 0))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, replica.getCollectionNum(), 0) assert.True(t, replica.hasSegment(0))
}
func TestCollectionReplica_getCollectionByID(t *testing.T) { seg, err := replica.getSegmentByID(0)
replica := newReplica() assert.NoError(t, err)
collectionName := "collection0" assert.NotNil(t, seg)
collectionID := UniqueID(0) assert.Equal(t, UniqueID(1), seg.collectionID)
initTestReplicaMeta(t, replica, collectionName, collectionID, 0) assert.Equal(t, UniqueID(2), seg.partitionID)
targetCollection, err := replica.getCollectionByID(collectionID)
assert.NoError(t, err)
assert.NotNil(t, targetCollection)
assert.Equal(t, targetCollection.Name(), collectionName)
assert.Equal(t, targetCollection.ID(), collectionID)
}
func TestCollectionReplica_getCollectionByName(t *testing.T) { assert.Equal(t, int64(0), seg.numRows)
replica := newReplica()
collectionName := "collection0"
collectionID := UniqueID(0)
initTestReplicaMeta(t, replica, collectionName, collectionID, 0)
targetCollection, err := replica.getCollectionByName(collectionName) err = replica.updateStatistics(0, 100)
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, targetCollection) assert.Equal(t, int64(100), seg.numRows)
assert.Equal(t, targetCollection.Name(), collectionName)
assert.Equal(t, targetCollection.ID(), collectionID)
} update, err := replica.getSegmentStatisticsUpdates(0)
assert.NoError(t, err)
assert.Equal(t, UniqueID(0), update.SegmentID)
assert.Equal(t, int64(100), update.NumRows)
assert.True(t, update.IsNewSegment)
})
func TestCollectionReplica_hasCollection(t *testing.T) { t.Run("Test errors", func(t *testing.T) {
replica := newReplica() replica := newReplica()
collectionName := "collection0" require.False(t, replica.hasSegment(0))
collectionID := UniqueID(0)
initTestReplicaMeta(t, replica, collectionName, collectionID, 0)
hasCollection := replica.hasCollection(collectionID) seg, err := replica.getSegmentByID(0)
assert.Equal(t, hasCollection, true) assert.Error(t, err)
hasCollection = replica.hasCollection(UniqueID(1)) assert.Nil(t, seg)
assert.Equal(t, hasCollection, false)
} err = replica.removeSegment(0)
assert.Error(t, err)
func TestCollectionReplica_freeAll(t *testing.T) { err = replica.updateStatistics(0, 0)
replica := newReplica() assert.Error(t, err)
collectionName := "collection0"
collectionID := UniqueID(0)
initTestReplicaMeta(t, replica, collectionName, collectionID, 0)
update, err := replica.getSegmentStatisticsUpdates(0)
assert.Error(t, err)
assert.Nil(t, update)
})
} }
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
) )
const ( const (
...@@ -38,7 +39,6 @@ type ( ...@@ -38,7 +39,6 @@ type (
FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error)
SetMasterServiceInterface(ms MasterServiceInterface) error SetMasterServiceInterface(ms MasterServiceInterface) error
SetDataServiceInterface(ds DataServiceInterface) error SetDataServiceInterface(ds DataServiceInterface) error
} }
...@@ -55,7 +55,6 @@ type ( ...@@ -55,7 +55,6 @@ type (
} }
DataNode struct { DataNode struct {
// GOOSE TODO: complete interface with component
ctx context.Context ctx context.Context
NodeID UniqueID NodeID UniqueID
Role string Role string
...@@ -80,8 +79,8 @@ func NewDataNode(ctx context.Context) *DataNode { ...@@ -80,8 +79,8 @@ func NewDataNode(ctx context.Context) *DataNode {
Params.Init() Params.Init()
node := &DataNode{ node := &DataNode{
ctx: ctx, ctx: ctx,
NodeID: Params.NodeID, // GOOSE TODO NodeID: Params.NodeID, // GOOSE TODO How to init
Role: "DataNode", // GOOSE TODO Role: typeutil.DataNodeRole,
State: internalpb2.StateCode_INITIALIZING, State: internalpb2.StateCode_INITIALIZING,
dataSyncService: nil, dataSyncService: nil,
metaService: nil, metaService: nil,
...@@ -107,7 +106,7 @@ func (node *DataNode) Init() error { ...@@ -107,7 +106,7 @@ func (node *DataNode) Init() error {
req := &datapb.RegisterNodeRequest{ req := &datapb.RegisterNodeRequest{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kNone, //GOOSE TODO MsgType: commonpb.MsgType_kNone,
SourceID: node.NodeID, SourceID: node.NodeID,
}, },
Address: &commonpb.Address{ Address: &commonpb.Address{
...@@ -118,11 +117,10 @@ func (node *DataNode) Init() error { ...@@ -118,11 +117,10 @@ func (node *DataNode) Init() error {
resp, err := node.dataService.RegisterNode(req) resp, err := node.dataService.RegisterNode(req)
if err != nil { if err != nil {
return errors.Errorf("Init failed: %v", err) return errors.Errorf("Register node failed: %v", err)
} }
for _, kv := range resp.InitParams.StartParams { for _, kv := range resp.InitParams.StartParams {
log.Println(kv)
switch kv.Key { switch kv.Key {
case "DDChannelName": case "DDChannelName":
Params.DDChannelNames = []string{kv.Value} Params.DDChannelNames = []string{kv.Value}
...@@ -150,7 +148,7 @@ func (node *DataNode) Init() error { ...@@ -150,7 +148,7 @@ func (node *DataNode) Init() error {
node.metaService = newMetaService(node.ctx, replica, node.masterService) node.metaService = newMetaService(node.ctx, replica, node.masterService)
node.replica = replica node.replica = replica
// Opentracing // --- Opentracing ---
cfg := &config.Configuration{ cfg := &config.Configuration{
ServiceName: "data_node", ServiceName: "data_node",
Sampler: &config.SamplerConfig{ Sampler: &config.SamplerConfig{
...@@ -167,7 +165,6 @@ func (node *DataNode) Init() error { ...@@ -167,7 +165,6 @@ func (node *DataNode) Init() error {
} }
node.tracer = tracer node.tracer = tracer
node.closer = closer node.closer = closer
opentracing.SetGlobalTracer(node.tracer) opentracing.SetGlobalTracer(node.tracer)
return nil return nil
...@@ -183,12 +180,14 @@ func (node *DataNode) Start() error { ...@@ -183,12 +180,14 @@ func (node *DataNode) Start() error {
} }
func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) { func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
log.Println("Init insert channel names:", in.GetChannelNames())
Params.InsertChannelNames = append(Params.InsertChannelNames, in.GetChannelNames()...) Params.InsertChannelNames = append(Params.InsertChannelNames, in.GetChannelNames()...)
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, nil return &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, nil
} }
func (node *DataNode) GetComponentStates() (*internalpb2.ComponentStates, error) { func (node *DataNode) GetComponentStates() (*internalpb2.ComponentStates, error) {
log.Println("DataNode current state:", node.State)
states := &internalpb2.ComponentStates{ states := &internalpb2.ComponentStates{
State: &internalpb2.ComponentInfo{ State: &internalpb2.ComponentInfo{
NodeID: Params.NodeID, NodeID: Params.NodeID,
......
...@@ -17,6 +17,7 @@ type ( ...@@ -17,6 +17,7 @@ type (
} }
AllocatorFactory struct { AllocatorFactory struct {
ID UniqueID
} }
MasterServiceFactory struct { MasterServiceFactory struct {
...@@ -161,9 +162,23 @@ func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionNa ...@@ -161,9 +162,23 @@ func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionNa
return &collection return &collection
} }
func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory {
f := &AllocatorFactory{}
if len(id) == 1 {
f.ID = id[0]
}
return f
}
func (alloc AllocatorFactory) setID(id UniqueID) {
alloc.ID = id
}
func (alloc AllocatorFactory) allocID() (UniqueID, error) { func (alloc AllocatorFactory) allocID() (UniqueID, error) {
// GOOSE TODO: random ID generate if alloc.ID == 0 {
return UniqueID(0), nil return UniqueID(0), nil // GOOSE TODO: random ID generating
}
return alloc.ID, nil
} }
func (m *MasterServiceFactory) setID(id UniqueID) { func (m *MasterServiceFactory) setID(id UniqueID) {
......
...@@ -112,7 +112,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { ...@@ -112,7 +112,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
case commonpb.MsgType_kDropPartition: case commonpb.MsgType_kDropPartition:
ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg)) ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg))
default: default:
log.Println("Non supporting message type:", msg.Type()) log.Println("Not supporting message type:", msg.Type())
} }
} }
...@@ -132,7 +132,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { ...@@ -132,7 +132,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
} }
default: default:
log.Println("..........default do nothing") log.Println(". default: do nothing ...")
} }
// generate binlog // generate binlog
...@@ -303,8 +303,8 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { ...@@ -303,8 +303,8 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) {
} }
ddNode.ddRecords.partitionRecords[partitionID] = nil ddNode.ddRecords.partitionRecords[partitionID] = nil
partitionTag := msg.PartitionName partitionName := msg.PartitionName
ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag], ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName],
metaOperateRecord{ metaOperateRecord{
createOrDrop: true, createOrDrop: true,
timestamp: msg.Base.Timestamp, timestamp: msg.Base.Timestamp,
...@@ -341,8 +341,8 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { ...@@ -341,8 +341,8 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
} }
delete(ddNode.ddRecords.partitionRecords, partitionID) delete(ddNode.ddRecords.partitionRecords, partitionID)
partitionTag := msg.PartitionName partitionName := msg.PartitionName
ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag], ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName],
metaOperateRecord{ metaOperateRecord{
createOrDrop: false, createOrDrop: false,
timestamp: msg.Base.Timestamp, timestamp: msg.Base.Timestamp,
......
...@@ -2,6 +2,7 @@ package datanode ...@@ -2,6 +2,7 @@ package datanode
import ( import (
"context" "context"
"log"
"testing" "testing"
"time" "time"
...@@ -35,51 +36,57 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { ...@@ -35,51 +36,57 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
Params.MetaRootPath = testPath Params.MetaRootPath = testPath
Params.FlushDdBufferSize = 4 // Params.FlushDdBufferSize = 4
replica := newReplica() replica := newReplica()
idFactory := AllocatorFactory{} allocatorMock := NewAllocatorFactory()
ddNode := newDDNode(ctx, newMetaTable(), inFlushCh, replica, idFactory) ddNode := newDDNode(ctx, newMetaTable(), inFlushCh, replica, allocatorMock)
log.Print()
colID := UniqueID(0) collID := UniqueID(0)
colName := "col-test-0" collName := "col-test-0"
// create collection // create collection
createColReq := internalpb2.CreateCollectionRequest{ createCollReq := internalpb2.CreateCollectionRequest{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kCreateCollection, MsgType: commonpb.MsgType_kCreateCollection,
MsgID: 1, MsgID: 1,
Timestamp: 1, Timestamp: 1,
SourceID: 1, SourceID: 1,
}, },
CollectionID: colID, CollectionID: collID,
Schema: make([]byte, 0), Schema: make([]byte, 0),
CollectionName: collName,
DbName: "DbName",
DbID: UniqueID(0),
} }
createColMsg := msgstream.CreateCollectionMsg{ createCollMsg := msgstream.CreateCollectionMsg{
BaseMsg: msgstream.BaseMsg{ BaseMsg: msgstream.BaseMsg{
BeginTimestamp: Timestamp(1), BeginTimestamp: Timestamp(1),
EndTimestamp: Timestamp(1), EndTimestamp: Timestamp(1),
HashValues: []uint32{uint32(0)}, HashValues: []uint32{uint32(0)},
}, },
CreateCollectionRequest: createColReq, CreateCollectionRequest: createCollReq,
} }
// drop collection // drop collection
dropColReq := internalpb2.DropCollectionRequest{ dropCollReq := internalpb2.DropCollectionRequest{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDropCollection, MsgType: commonpb.MsgType_kDropCollection,
MsgID: 2, MsgID: 2,
Timestamp: 2, Timestamp: 2,
SourceID: 2, SourceID: 2,
}, },
CollectionID: colID, CollectionID: collID,
CollectionName: colName, CollectionName: collName,
DbName: "DbName",
DbID: UniqueID(0),
} }
dropColMsg := msgstream.DropCollectionMsg{ dropCollMsg := msgstream.DropCollectionMsg{
BaseMsg: msgstream.BaseMsg{ BaseMsg: msgstream.BaseMsg{
BeginTimestamp: Timestamp(2), BeginTimestamp: Timestamp(2),
EndTimestamp: Timestamp(2), EndTimestamp: Timestamp(2),
HashValues: []uint32{uint32(0)}, HashValues: []uint32{uint32(0)},
}, },
DropCollectionRequest: dropColReq, DropCollectionRequest: dropCollReq,
} }
partitionID := UniqueID(100) partitionID := UniqueID(100)
...@@ -92,10 +99,12 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { ...@@ -92,10 +99,12 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
Timestamp: 3, Timestamp: 3,
SourceID: 3, SourceID: 3,
}, },
CollectionID: colID, CollectionID: collID,
PartitionID: partitionID, PartitionID: partitionID,
CollectionName: colName, CollectionName: collName,
PartitionName: partitionName, PartitionName: partitionName,
DbName: "DbName",
DbID: UniqueID(0),
} }
createPartitionMsg := msgstream.CreatePartitionMsg{ createPartitionMsg := msgstream.CreatePartitionMsg{
BaseMsg: msgstream.BaseMsg{ BaseMsg: msgstream.BaseMsg{
...@@ -114,10 +123,12 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { ...@@ -114,10 +123,12 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
Timestamp: 4, Timestamp: 4,
SourceID: 4, SourceID: 4,
}, },
CollectionID: colID, CollectionID: collID,
PartitionID: partitionID, PartitionID: partitionID,
CollectionName: colName, CollectionName: collName,
PartitionName: partitionName, PartitionName: partitionName,
DbName: "DbName",
DbID: UniqueID(0),
} }
dropPartitionMsg := msgstream.DropPartitionMsg{ dropPartitionMsg := msgstream.DropPartitionMsg{
BaseMsg: msgstream.BaseMsg{ BaseMsg: msgstream.BaseMsg{
...@@ -128,16 +139,17 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { ...@@ -128,16 +139,17 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
DropPartitionRequest: dropPartitionReq, DropPartitionRequest: dropPartitionReq,
} }
replica.addSegment(1, collID, partitionID, make([]*internalpb2.MsgPosition, 0))
inFlushCh <- &flushMsg{ inFlushCh <- &flushMsg{
msgID: 1, msgID: 5,
timestamp: 6, timestamp: 5,
segmentIDs: []UniqueID{1}, segmentIDs: []UniqueID{1},
collectionID: UniqueID(1), collectionID: collID,
} }
tsMessages := make([]msgstream.TsMsg, 0) tsMessages := make([]msgstream.TsMsg, 0)
tsMessages = append(tsMessages, msgstream.TsMsg(&createColMsg)) tsMessages = append(tsMessages, msgstream.TsMsg(&createCollMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&dropColMsg)) tsMessages = append(tsMessages, msgstream.TsMsg(&dropCollMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&createPartitionMsg)) tsMessages = append(tsMessages, msgstream.TsMsg(&createPartitionMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&dropPartitionMsg)) tsMessages = append(tsMessages, msgstream.TsMsg(&dropPartitionMsg))
msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3), make([]*internalpb2.MsgPosition, 0)) msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3), make([]*internalpb2.MsgPosition, 0))
......
...@@ -27,6 +27,7 @@ func newMetaService(ctx context.Context, replica collectionReplica, m MasterServ ...@@ -27,6 +27,7 @@ func newMetaService(ctx context.Context, replica collectionReplica, m MasterServ
} }
func (mService *metaService) init() { func (mService *metaService) init() {
log.Println("Initing meta ...")
err := mService.loadCollections() err := mService.loadCollections()
if err != nil { if err != nil {
log.Fatal("metaService init failed:", err) log.Fatal("metaService init failed:", err)
......
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"sync" "sync"
dn "github.com/zilliztech/milvus-distributed/internal/datanode" dn "github.com/zilliztech/milvus-distributed/internal/datanode"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
...@@ -67,11 +66,6 @@ func (s *Server) SetDataServiceInterface(ds dn.DataServiceInterface) error { ...@@ -67,11 +66,6 @@ func (s *Server) SetDataServiceInterface(ds dn.DataServiceInterface) error {
} }
func (s *Server) Init() error { func (s *Server) Init() error {
err := s.core.Init()
if err != nil {
return errors.Errorf("Init failed: %v", err)
}
return s.core.Init() return s.core.Init()
} }
......
package proxyservice package proxyservice
import ( import (
"context" "sync"
"github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/allocator"
...@@ -17,22 +17,21 @@ type NodeIDAllocator interface { ...@@ -17,22 +17,21 @@ type NodeIDAllocator interface {
type NaiveNodeIDAllocatorImpl struct { type NaiveNodeIDAllocatorImpl struct {
impl *allocator.IDAllocator impl *allocator.IDAllocator
now UniqueID
mtx sync.Mutex
} }
func (allocator *NaiveNodeIDAllocatorImpl) AllocOne() UniqueID { func (allocator *NaiveNodeIDAllocatorImpl) AllocOne() UniqueID {
id, err := allocator.impl.AllocOne() allocator.mtx.Lock()
if err != nil { defer func() {
panic(err) allocator.now++
} allocator.mtx.Unlock()
return id }()
return allocator.now
} }
func NewNodeIDAllocator() NodeIDAllocator { func NewNodeIDAllocator() NodeIDAllocator {
impl, err := allocator.NewIDAllocator(context.Background(), Params.MasterAddress())
if err != nil {
panic(err)
}
return &NaiveNodeIDAllocatorImpl{ return &NaiveNodeIDAllocatorImpl{
impl: impl, now: 0,
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册