提交 05b013c5 编写于 作者: X XuanYang-cn 提交者: yefu.chen

Fix datanode bug

Signed-off-by: NXuanYang-cn <xuan.yang@zilliz.com>
上级 947976db
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
) )
const ( const (
...@@ -38,7 +39,6 @@ type ( ...@@ -38,7 +39,6 @@ type (
FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error)
SetMasterServiceInterface(ms MasterServiceInterface) error SetMasterServiceInterface(ms MasterServiceInterface) error
SetDataServiceInterface(ds DataServiceInterface) error SetDataServiceInterface(ds DataServiceInterface) error
} }
...@@ -55,7 +55,6 @@ type ( ...@@ -55,7 +55,6 @@ type (
} }
DataNode struct { DataNode struct {
// GOOSE TODO: complete interface with component
ctx context.Context ctx context.Context
NodeID UniqueID NodeID UniqueID
Role string Role string
...@@ -80,8 +79,8 @@ func NewDataNode(ctx context.Context) *DataNode { ...@@ -80,8 +79,8 @@ func NewDataNode(ctx context.Context) *DataNode {
Params.Init() Params.Init()
node := &DataNode{ node := &DataNode{
ctx: ctx, ctx: ctx,
NodeID: Params.NodeID, // GOOSE TODO NodeID: Params.NodeID, // GOOSE TODO
Role: "DataNode", // GOOSE TODO Role: typeutil.DataNodeRole, // GOOSE TODO
State: internalpb2.StateCode_INITIALIZING, State: internalpb2.StateCode_INITIALIZING,
dataSyncService: nil, dataSyncService: nil,
metaService: nil, metaService: nil,
...@@ -118,11 +117,10 @@ func (node *DataNode) Init() error { ...@@ -118,11 +117,10 @@ func (node *DataNode) Init() error {
resp, err := node.dataService.RegisterNode(req) resp, err := node.dataService.RegisterNode(req)
if err != nil { if err != nil {
return errors.Errorf("Init failed: %v", err) return errors.Errorf("Register node failed: %v", err)
} }
for _, kv := range resp.InitParams.StartParams { for _, kv := range resp.InitParams.StartParams {
log.Println(kv)
switch kv.Key { switch kv.Key {
case "DDChannelName": case "DDChannelName":
Params.DDChannelNames = []string{kv.Value} Params.DDChannelNames = []string{kv.Value}
...@@ -150,7 +148,7 @@ func (node *DataNode) Init() error { ...@@ -150,7 +148,7 @@ func (node *DataNode) Init() error {
node.metaService = newMetaService(node.ctx, replica, node.masterService) node.metaService = newMetaService(node.ctx, replica, node.masterService)
node.replica = replica node.replica = replica
// Opentracing // --- Opentracing ---
cfg := &config.Configuration{ cfg := &config.Configuration{
ServiceName: "data_node", ServiceName: "data_node",
Sampler: &config.SamplerConfig{ Sampler: &config.SamplerConfig{
...@@ -167,7 +165,6 @@ func (node *DataNode) Init() error { ...@@ -167,7 +165,6 @@ func (node *DataNode) Init() error {
} }
node.tracer = tracer node.tracer = tracer
node.closer = closer node.closer = closer
opentracing.SetGlobalTracer(node.tracer) opentracing.SetGlobalTracer(node.tracer)
return nil return nil
......
...@@ -27,6 +27,7 @@ func newMetaService(ctx context.Context, replica collectionReplica, m MasterServ ...@@ -27,6 +27,7 @@ func newMetaService(ctx context.Context, replica collectionReplica, m MasterServ
} }
func (mService *metaService) init() { func (mService *metaService) init() {
log.Println("Initing meta ...")
err := mService.loadCollections() err := mService.loadCollections()
if err != nil { if err != nil {
log.Fatal("metaService init failed:", err) log.Fatal("metaService init failed:", err)
......
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"sync" "sync"
dn "github.com/zilliztech/milvus-distributed/internal/datanode" dn "github.com/zilliztech/milvus-distributed/internal/datanode"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
...@@ -67,11 +66,6 @@ func (s *Server) SetDataServiceInterface(ds dn.DataServiceInterface) error { ...@@ -67,11 +66,6 @@ func (s *Server) SetDataServiceInterface(ds dn.DataServiceInterface) error {
} }
func (s *Server) Init() error { func (s *Server) Init() error {
err := s.core.Init()
if err != nil {
return errors.Errorf("Init failed: %v", err)
}
return s.core.Init() return s.core.Init()
} }
......
...@@ -77,6 +77,26 @@ func (qs *QueryService) Stop() error { ...@@ -77,6 +77,26 @@ func (qs *QueryService) Stop() error {
return nil return nil
} }
//func (qs *QueryService) SetDataService(d querynode.DataServiceInterface) error {
// for _, v := range qs.queryNodeClient {
// err := v.SetDataService(d)
// if err != nil {
// return err
// }
// }
// return nil
//}
//
//func (qs *QueryService) SetIndexService(i querynode.IndexServiceInterface) error {
// for _, v := range qs.queryNodeClient {
// err := v.SetIndexService(i)
// if err != nil {
// return err
// }
// }
// return nil
//}
func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, error) { func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, error) {
serviceComponentInfo := &internalpb2.ComponentInfo{ serviceComponentInfo := &internalpb2.ComponentInfo{
NodeID: Params.QueryServiceID, NodeID: Params.QueryServiceID,
...@@ -114,7 +134,6 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) { ...@@ -114,7 +134,6 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) {
// TODO:: do addWatchDmChannel to query node after registerNode // TODO:: do addWatchDmChannel to query node after registerNode
func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
fmt.Println("register query node =", req.Address) fmt.Println("register query node =", req.Address)
// TODO:: add mutex
allocatedID := qs.numRegisterNode allocatedID := qs.numRegisterNode
qs.numRegisterNode++ qs.numRegisterNode++
...@@ -139,7 +158,6 @@ func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb ...@@ -139,7 +158,6 @@ func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb
} }
qs.queryNodes = append(qs.queryNodes, node) qs.queryNodes = append(qs.queryNodes, node)
// TODO:: watch dm channels
return &querypb.RegisterNodeResponse{ return &querypb.RegisterNodeResponse{
Status: &commonpb.Status{ Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS, ErrorCode: commonpb.ErrorCode_SUCCESS,
...@@ -271,7 +289,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm ...@@ -271,7 +289,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
segmentIDs := showSegmentResponse.SegmentIDs segmentIDs := showSegmentResponse.SegmentIDs
segmentStates := make(map[UniqueID]*datapb.SegmentStatesResponse) segmentStates := make(map[UniqueID]*datapb.SegmentStatesResponse)
channel2id := make(map[string]int) channel2id := make(map[string]int)
//id2channels := make(map[int][]string) id2channels := make(map[int][]string)
id2segs := make(map[int][]UniqueID) id2segs := make(map[int][]UniqueID)
offset := 0 offset := 0
...@@ -288,16 +306,13 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm ...@@ -288,16 +306,13 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
for i, str := range state.StartPositions { for i, str := range state.StartPositions {
flatChannelName += str.ChannelName flatChannelName += str.ChannelName
channelNames = append(channelNames, str.ChannelName) channelNames = append(channelNames, str.ChannelName)
if i+1 < len(state.StartPositions) { if i < len(state.StartPositions) {
flatChannelName += "/" flatChannelName += "/"
} }
} }
if flatChannelName == "" {
log.Fatal("segmentState's channel name is empty")
}
if _, ok := channel2id[flatChannelName]; !ok { if _, ok := channel2id[flatChannelName]; !ok {
channel2id[flatChannelName] = offset channel2id[flatChannelName] = offset
//id2channels[offset] = channelNames id2channels[offset] = channelNames
id2segs[offset] = make([]UniqueID, 0) id2segs[offset] = make([]UniqueID, 0)
id2segs[offset] = append(id2segs[offset], segmentID) id2segs[offset] = append(id2segs[offset], segmentID)
offset++ offset++
...@@ -314,7 +329,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm ...@@ -314,7 +329,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
if segmentStates[v].State == datapb.SegmentState_SegmentFlushed { if segmentStates[v].State == datapb.SegmentState_SegmentFlushed {
selectedSegs = append(selectedSegs, v) selectedSegs = append(selectedSegs, v)
} else { } else {
if i > 0 && segmentStates[selectedSegs[i-1]].State != datapb.SegmentState_SegmentFlushed { if i > 0 && segmentStates[v-1].State != datapb.SegmentState_SegmentFlushed {
break break
} }
selectedSegs = append(selectedSegs, v) selectedSegs = append(selectedSegs, v)
......
...@@ -2,148 +2,11 @@ package queryservice ...@@ -2,148 +2,11 @@ package queryservice
import ( import (
"context" "context"
"strconv"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
) )
type masterMock struct {
collectionIDs []UniqueID
col2partition map[UniqueID][]UniqueID
partition2segment map[UniqueID][]UniqueID
}
func newMasterMock() *masterMock {
collectionIDs := make([]UniqueID, 0)
collectionIDs = append(collectionIDs, 1)
col2partition := make(map[UniqueID][]UniqueID)
partitionIDs := make([]UniqueID, 0)
partitionIDs = append(partitionIDs, 1)
col2partition[1] = partitionIDs
partition2segment := make(map[UniqueID][]UniqueID)
segmentIDs := make([]UniqueID, 0)
segmentIDs = append(segmentIDs, 1)
segmentIDs = append(segmentIDs, 2)
segmentIDs = append(segmentIDs, 3)
segmentIDs = append(segmentIDs, 4)
segmentIDs = append(segmentIDs, 5)
segmentIDs = append(segmentIDs, 6)
partition2segment[1] = segmentIDs
return &masterMock{
collectionIDs: collectionIDs,
col2partition: col2partition,
partition2segment: partition2segment,
}
}
func (master *masterMock) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
collectionID := in.CollectionID
partitionIDs := make([]UniqueID, 0)
for _, id := range master.collectionIDs {
if id == collectionID {
partitions := master.col2partition[collectionID]
partitionIDs = append(partitionIDs, partitions...)
}
}
response := &milvuspb.ShowPartitionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
PartitionIDs: partitionIDs,
}
return response, nil
}
func (master *masterMock) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) {
collectionID := in.CollectionID
partitionID := in.PartitionID
for _, id := range master.collectionIDs {
if id == collectionID {
partitions := master.col2partition[collectionID]
for _, partition := range partitions {
if partition == partitionID {
return &milvuspb.ShowSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
SegmentIDs: master.partition2segment[partition],
}, nil
}
}
}
}
return nil, errors.New("collection id or partition id not found")
}
type dataMock struct {
segmentIDs []UniqueID
segmentStates map[UniqueID]*datapb.SegmentStatesResponse
}
func newDataMock() *dataMock {
positions1 := make([]*internalpb2.MsgPosition, 0)
positions2 := make([]*internalpb2.MsgPosition, 0)
positions1 = append(positions1, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(1, 10)})
positions1 = append(positions1, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(2, 10)})
positions2 = append(positions2, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(3, 10)})
positions2 = append(positions2, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(4, 10)})
segmentIDs := make([]UniqueID, 0)
segmentIDs = append(segmentIDs, 1)
segmentIDs = append(segmentIDs, 2)
segmentIDs = append(segmentIDs, 3)
segmentIDs = append(segmentIDs, 4)
segmentIDs = append(segmentIDs, 5)
segmentIDs = append(segmentIDs, 6)
fillStates := func(time uint64, position []*internalpb2.MsgPosition, state datapb.SegmentState) *datapb.SegmentStatesResponse {
return &datapb.SegmentStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
State: state,
CreateTime: time,
StartPositions: position,
}
}
segmentStates := make(map[UniqueID]*datapb.SegmentStatesResponse)
segmentStates[1] = fillStates(1, positions1, datapb.SegmentState_SegmentFlushed)
segmentStates[2] = fillStates(2, positions2, datapb.SegmentState_SegmentFlushed)
segmentStates[3] = fillStates(3, positions1, datapb.SegmentState_SegmentFlushed)
segmentStates[4] = fillStates(4, positions2, datapb.SegmentState_SegmentFlushed)
segmentStates[5] = fillStates(5, positions1, datapb.SegmentState_SegmentGrowing)
segmentStates[6] = fillStates(6, positions2, datapb.SegmentState_SegmentGrowing)
return &dataMock{
segmentIDs: segmentIDs,
segmentStates: segmentStates,
}
}
func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) {
segmentID := req.SegmentID
for _, id := range data.segmentIDs {
if segmentID == id {
return data.segmentStates[id], nil
}
}
return nil, errors.New("segment id not found")
}
func TestQueryService_Init(t *testing.T) { func TestQueryService_Init(t *testing.T) {
service, err := NewQueryService(context.Background()) service, err := NewQueryService(context.Background())
assert.Nil(t, err) assert.Nil(t, err)
...@@ -171,35 +34,3 @@ func TestQueryService_Init(t *testing.T) { ...@@ -171,35 +34,3 @@ func TestQueryService_Init(t *testing.T) {
service.Stop() service.Stop()
} }
func TestQueryService_load(t *testing.T) {
service, err := NewQueryService(context.Background())
assert.Nil(t, err)
service.Init()
service.Start()
service.SetMasterService(newMasterMock())
service.SetDataService(newDataMock())
registerNodeRequest := &querypb.RegisterNodeRequest{
Address: &commonpb.Address{},
}
service.RegisterNode(registerNodeRequest)
t.Run("Test LoadSegment", func(t *testing.T) {
loadCollectionRequest := &querypb.LoadCollectionRequest{
CollectionID: 1,
}
response, err := service.LoadCollection(loadCollectionRequest)
assert.Nil(t, err)
assert.Equal(t, response.ErrorCode, commonpb.ErrorCode_SUCCESS)
})
t.Run("Test LoadPartition", func(t *testing.T) {
loadPartitionRequest := &querypb.LoadPartitionRequest{
CollectionID: 1,
PartitionIDs: []UniqueID{1},
}
response, err := service.LoadPartitions(loadPartitionRequest)
assert.Nil(t, err)
assert.Equal(t, response.ErrorCode, commonpb.ErrorCode_SUCCESS)
})
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册