未验证 提交 46e0e265 编写于 作者: W wayblink 提交者: GitHub

Move datacoord related methods from meta to globalMetaBroker (#17812)

Signed-off-by: Nwayblink <anyang.wang@zilliz.com>
上级 6840486e
...@@ -49,7 +49,7 @@ func TestShuffleChannelsToQueryNode(t *testing.T) { ...@@ -49,7 +49,7 @@ func TestShuffleChannelsToQueryNode(t *testing.T) {
newID := atomic.AddInt64(&id, 1) newID := atomic.AddInt64(&id, 1)
return newID, nil return newID, nil
} }
meta, err := newMeta(baseCtx, kv, nil, idAllocator, nil) meta, err := newMeta(baseCtx, kv, nil, idAllocator)
assert.Nil(t, err) assert.Nil(t, err)
var cluster Cluster = &queryNodeCluster{ var cluster Cluster = &queryNodeCluster{
ctx: baseCtx, ctx: baseCtx,
......
...@@ -376,7 +376,7 @@ func TestReloadClusterFromKV(t *testing.T) { ...@@ -376,7 +376,7 @@ func TestReloadClusterFromKV(t *testing.T) {
newID := atomic.AddInt64(&id, 1) newID := atomic.AddInt64(&id, 1)
return newID, nil return newID, nil
} }
meta, err := newMeta(ctx, kv, factory, idAllocator, nil) meta, err := newMeta(ctx, kv, factory, idAllocator)
assert.Nil(t, err) assert.Nil(t, err)
cluster := &queryNodeCluster{ cluster := &queryNodeCluster{
...@@ -425,7 +425,7 @@ func TestGrpcRequest(t *testing.T) { ...@@ -425,7 +425,7 @@ func TestGrpcRequest(t *testing.T) {
idAllocator := func() (UniqueID, error) { idAllocator := func() (UniqueID, error) {
return 0, nil return 0, nil
} }
meta, err := newMeta(baseCtx, kv, factory, idAllocator, nil) meta, err := newMeta(baseCtx, kv, factory, idAllocator)
assert.Nil(t, err) assert.Nil(t, err)
deltaChannelInfo := []*datapb.VchannelInfo{ deltaChannelInfo := []*datapb.VchannelInfo{
{ {
...@@ -631,7 +631,7 @@ func TestSetNodeState(t *testing.T) { ...@@ -631,7 +631,7 @@ func TestSetNodeState(t *testing.T) {
idAllocator := func() (UniqueID, error) { idAllocator := func() (UniqueID, error) {
return 0, nil return 0, nil
} }
meta, err := newMeta(baseCtx, kv, factory, idAllocator, nil) meta, err := newMeta(baseCtx, kv, factory, idAllocator)
assert.Nil(t, err) assert.Nil(t, err)
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
......
...@@ -531,3 +531,29 @@ func (broker *globalMetaBroker) releaseSegmentReferLock(ctx context.Context, tas ...@@ -531,3 +531,29 @@ func (broker *globalMetaBroker) releaseSegmentReferLock(ctx context.Context, tas
return nil return nil
} }
// getDataSegmentInfosByIDs return the SegmentInfo details according to the given ids through RPC to datacoord
func (broker *globalMetaBroker) getDataSegmentInfosByIDs(segmentIds []int64) ([]*datapb.SegmentInfo, error) {
var segmentInfos []*datapb.SegmentInfo
infoResp, err := broker.dataCoord.GetSegmentInfo(broker.ctx, &datapb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyCfg.GetNodeID(),
},
SegmentIDs: segmentIds,
IncludeUnHealthy: true,
})
if err != nil {
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
return nil, err
}
if infoResp.GetStatus().ErrorCode != commonpb.ErrorCode_Success {
err = errors.New(infoResp.GetStatus().Reason)
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
return nil, err
}
segmentInfos = infoResp.Infos
return segmentInfos, nil
}
...@@ -159,3 +159,31 @@ func TestGlobalMetaBroker_IndexCoord(t *testing.T) { ...@@ -159,3 +159,31 @@ func TestGlobalMetaBroker_IndexCoord(t *testing.T) {
cancel() cancel()
} }
func TestGetDataSegmentInfosByIDs(t *testing.T) {
refreshParams()
ctx, cancel := context.WithCancel(context.Background())
dataCoord := newDataCoordMock(ctx)
cm := storage.NewLocalChunkManager(storage.RootPath(globalMetaTestDir))
defer cm.RemoveWithPrefix("")
handler, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil, cm)
assert.Nil(t, err)
segmentInfos, err := handler.getDataSegmentInfosByIDs([]int64{1})
assert.Nil(t, err)
assert.Equal(t, 1, len(segmentInfos))
dataCoord.returnError = true
segmentInfos2, err := handler.getDataSegmentInfosByIDs([]int64{1})
assert.Error(t, err)
assert.Empty(t, segmentInfos2)
dataCoord.returnError = false
dataCoord.returnGrpcError = true
segmentInfos3, err := handler.getDataSegmentInfosByIDs([]int64{1})
assert.Error(t, err)
assert.Empty(t, segmentInfos3)
cancel()
}
...@@ -47,7 +47,7 @@ func TestReloadFromKV(t *testing.T) { ...@@ -47,7 +47,7 @@ func TestReloadFromKV(t *testing.T) {
newID := atomic.AddInt64(&id, 1) newID := atomic.AddInt64(&id, 1)
return newID, nil return newID, nil
} }
meta, err := newMeta(baseCtx, kv, nil, idAllocator, nil) meta, err := newMeta(baseCtx, kv, nil, idAllocator)
assert.Nil(t, err) assert.Nil(t, err)
segmentInfo := &querypb.SegmentInfo{ segmentInfo := &querypb.SegmentInfo{
...@@ -109,7 +109,7 @@ func TestCheckIndexLoop(t *testing.T) { ...@@ -109,7 +109,7 @@ func TestCheckIndexLoop(t *testing.T) {
return newID, nil return newID, nil
} }
meta, err := newMeta(ctx, kv, nil, idAllocator, nil) meta, err := newMeta(ctx, kv, nil, idAllocator)
assert.Nil(t, err) assert.Nil(t, err)
rootCoord := newRootCoordMock(ctx) rootCoord := newRootCoordMock(ctx)
...@@ -184,7 +184,7 @@ func TestHandoffNotExistSegment(t *testing.T) { ...@@ -184,7 +184,7 @@ func TestHandoffNotExistSegment(t *testing.T) {
newID := atomic.AddInt64(&id, 1) newID := atomic.AddInt64(&id, 1)
return newID, nil return newID, nil
} }
meta, err := newMeta(ctx, kv, nil, idAllocator, nil) meta, err := newMeta(ctx, kv, nil, idAllocator)
assert.Nil(t, err) assert.Nil(t, err)
rootCoord := newRootCoordMock(ctx) rootCoord := newRootCoordMock(ctx)
...@@ -243,7 +243,7 @@ func TestProcessHandoffAfterIndexDone(t *testing.T) { ...@@ -243,7 +243,7 @@ func TestProcessHandoffAfterIndexDone(t *testing.T) {
newID := atomic.AddInt64(&id, 1) newID := atomic.AddInt64(&id, 1)
return newID, nil return newID, nil
} }
meta, err := newMeta(ctx, kv, nil, idAllocator, nil) meta, err := newMeta(ctx, kv, nil, idAllocator)
assert.Nil(t, err) assert.Nil(t, err)
taskScheduler := &TaskScheduler{ taskScheduler := &TaskScheduler{
ctx: ctx, ctx: ctx,
......
...@@ -106,8 +106,6 @@ type Meta interface { ...@@ -106,8 +106,6 @@ type Meta interface {
getReplicasByNodeID(nodeID int64) ([]*milvuspb.ReplicaInfo, error) getReplicasByNodeID(nodeID int64) ([]*milvuspb.ReplicaInfo, error)
applyReplicaBalancePlan(p *balancePlan) error applyReplicaBalancePlan(p *balancePlan) error
updateShardLeader(replicaID UniqueID, dmChannel string, leaderID UniqueID, leaderAddr string) error updateShardLeader(replicaID UniqueID, dmChannel string, leaderID UniqueID, leaderAddr string) error
getDataSegmentInfosByIDs(segmentIds []int64) ([]*datapb.SegmentInfo, error)
} }
// MetaReplica records the current load information on all querynodes // MetaReplica records the current load information on all querynodes
...@@ -137,7 +135,7 @@ type MetaReplica struct { ...@@ -137,7 +135,7 @@ type MetaReplica struct {
dataCoord types.DataCoord dataCoord types.DataCoord
} }
func newMeta(ctx context.Context, kv kv.MetaKv, factory dependency.Factory, idAllocator func() (UniqueID, error), dataCoord types.DataCoord) (Meta, error) { func newMeta(ctx context.Context, kv kv.MetaKv, factory dependency.Factory, idAllocator func() (UniqueID, error)) (Meta, error) {
childCtx, cancel := context.WithCancel(ctx) childCtx, cancel := context.WithCancel(ctx)
collectionInfos := make(map[UniqueID]*querypb.CollectionInfo) collectionInfos := make(map[UniqueID]*querypb.CollectionInfo)
queryChannelInfos := make(map[UniqueID]*querypb.QueryChannelInfo) queryChannelInfos := make(map[UniqueID]*querypb.QueryChannelInfo)
...@@ -157,7 +155,6 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory dependency.Factory, idAl ...@@ -157,7 +155,6 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory dependency.Factory, idAl
segmentsInfo: newSegmentsInfo(kv), segmentsInfo: newSegmentsInfo(kv),
replicas: NewReplicaInfos(), replicas: NewReplicaInfos(),
dataCoord: dataCoord,
} }
m.setKvClient(kv) m.setKvClient(kv)
...@@ -1338,29 +1335,3 @@ func addNode2Segment(meta Meta, node UniqueID, replicas []*milvuspb.ReplicaInfo, ...@@ -1338,29 +1335,3 @@ func addNode2Segment(meta Meta, node UniqueID, replicas []*milvuspb.ReplicaInfo,
segment.NodeIds = append(segment.NodeIds, node) segment.NodeIds = append(segment.NodeIds, node)
} }
// getDataSegmentInfosByIDs return the SegmentInfo details according to the given ids through RPC to datacoord
func (m *MetaReplica) getDataSegmentInfosByIDs(segmentIds []int64) ([]*datapb.SegmentInfo, error) {
var segmentInfos []*datapb.SegmentInfo
infoResp, err := m.dataCoord.GetSegmentInfo(m.ctx, &datapb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyCfg.GetNodeID(),
},
SegmentIDs: segmentIds,
IncludeUnHealthy: true,
})
if err != nil {
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
return nil, err
}
if infoResp.GetStatus().ErrorCode != commonpb.ErrorCode_Success {
err = errors.New(infoResp.GetStatus().Reason)
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
return nil, err
}
segmentInfos = infoResp.Infos
return segmentInfos, nil
}
...@@ -75,7 +75,7 @@ func TestReplica_Release(t *testing.T) { ...@@ -75,7 +75,7 @@ func TestReplica_Release(t *testing.T) {
newID := atomic.AddInt64(&id, 1) newID := atomic.AddInt64(&id, 1)
return newID, nil return newID, nil
} }
meta, err := newMeta(context.Background(), etcdKV, nil, idAllocator, nil) meta, err := newMeta(context.Background(), etcdKV, nil, idAllocator)
assert.Nil(t, err) assert.Nil(t, err)
err = meta.addCollection(1, querypb.LoadType_LoadCollection, nil) err = meta.addCollection(1, querypb.LoadType_LoadCollection, nil)
require.NoError(t, err) require.NoError(t, err)
...@@ -424,25 +424,3 @@ func TestCreateQueryChannel(t *testing.T) { ...@@ -424,25 +424,3 @@ func TestCreateQueryChannel(t *testing.T) {
}) })
} }
} }
func TestGetDataSegmentInfosByIDs(t *testing.T) {
dataCoord := &dataCoordMock{}
meta := &MetaReplica{
dataCoord: dataCoord,
}
segmentInfos, err := meta.getDataSegmentInfosByIDs([]int64{1})
assert.Nil(t, err)
assert.Equal(t, 1, len(segmentInfos))
dataCoord.returnError = true
segmentInfos2, err := meta.getDataSegmentInfosByIDs([]int64{1})
assert.Error(t, err)
assert.Empty(t, segmentInfos2)
dataCoord.returnError = false
dataCoord.returnGrpcError = true
segmentInfos3, err := meta.getDataSegmentInfosByIDs([]int64{1})
assert.Error(t, err)
assert.Empty(t, segmentInfos3)
}
...@@ -159,7 +159,7 @@ func (qc *QueryCoord) Init() error { ...@@ -159,7 +159,7 @@ func (qc *QueryCoord) Init() error {
qc.factory.Init(&Params) qc.factory.Init(&Params)
// init meta // init meta
qc.meta, initError = newMeta(qc.loopCtx, qc.kvClient, qc.factory, qc.idAllocator, qc.dataCoordClient) qc.meta, initError = newMeta(qc.loopCtx, qc.kvClient, qc.factory, qc.idAllocator)
if initError != nil { if initError != nil {
log.Error("query coordinator init meta failed", zap.Error(initError)) log.Error("query coordinator init meta failed", zap.Error(initError))
return return
......
...@@ -50,7 +50,7 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) { ...@@ -50,7 +50,7 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) {
newID := atomic.AddInt64(&id, 1) newID := atomic.AddInt64(&id, 1)
return newID, nil return newID, nil
} }
meta, err := newMeta(baseCtx, kv, factory, idAllocator, nil) meta, err := newMeta(baseCtx, kv, factory, idAllocator)
assert.Nil(t, err) assert.Nil(t, err)
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
assert.Nil(t, err) assert.Nil(t, err)
......
...@@ -483,7 +483,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { ...@@ -483,7 +483,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
ReplicaID: replica.GetReplicaID(), ReplicaID: replica.GetReplicaID(),
} }
fullWatchRequest, err := generateFullWatchDmChannelsRequest(lct.meta, watchRequest) fullWatchRequest, err := generateFullWatchDmChannelsRequest(lct.broker, watchRequest)
if err != nil { if err != nil {
lct.setResultInfo(err) lct.setResultInfo(err)
return err return err
...@@ -915,7 +915,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { ...@@ -915,7 +915,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
ReplicaID: replica.GetReplicaID(), ReplicaID: replica.GetReplicaID(),
} }
fullWatchRequest, err := generateFullWatchDmChannelsRequest(lpt.meta, watchRequest) fullWatchRequest, err := generateFullWatchDmChannelsRequest(lpt.broker, watchRequest)
if err != nil { if err != nil {
lpt.setResultInfo(err) lpt.setResultInfo(err)
return err return err
...@@ -2035,7 +2035,7 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro ...@@ -2035,7 +2035,7 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro
watchRequest.PartitionIDs = toRecoverPartitionIDs watchRequest.PartitionIDs = toRecoverPartitionIDs
} }
fullWatchRequest, err := generateFullWatchDmChannelsRequest(lbt.meta, watchRequest) fullWatchRequest, err := generateFullWatchDmChannelsRequest(lbt.broker, watchRequest)
if err != nil { if err != nil {
lbt.setResultInfo(err) lbt.setResultInfo(err)
return err return err
......
...@@ -359,7 +359,7 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, ...@@ -359,7 +359,7 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
if err != nil { if err != nil {
return nil, err return nil, err
} }
fullReq, err := generateFullWatchDmChannelsRequest(scheduler.meta, &req) fullReq, err := generateFullWatchDmChannelsRequest(scheduler.broker, &req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -200,13 +200,13 @@ func TestUnMarshalTask(t *testing.T) { ...@@ -200,13 +200,13 @@ func TestUnMarshalTask(t *testing.T) {
kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
baseCtx, cancel := context.WithCancel(context.Background()) baseCtx, cancel := context.WithCancel(context.Background())
dataCoord := &dataCoordMock{} dataCoord := &dataCoordMock{}
meta := &MetaReplica{ broker, err := newGlobalMetaBroker(baseCtx, nil, dataCoord, nil, nil)
dataCoord: dataCoord, assert.Nil(t, err)
}
taskScheduler := &TaskScheduler{ taskScheduler := &TaskScheduler{
ctx: baseCtx, ctx: baseCtx,
cancel: cancel, cancel: cancel,
meta: meta, broker: broker,
} }
t.Run("Test loadCollectionTask", func(t *testing.T) { t.Run("Test loadCollectionTask", func(t *testing.T) {
......
...@@ -25,7 +25,7 @@ import ( ...@@ -25,7 +25,7 @@ import (
) )
// generateFullWatchDmChannelsRequest fill the WatchDmChannelsRequest by get segment infos from Meta // generateFullWatchDmChannelsRequest fill the WatchDmChannelsRequest by get segment infos from Meta
func generateFullWatchDmChannelsRequest(m Meta, request *querypb.WatchDmChannelsRequest) (*querypb.WatchDmChannelsRequest, error) { func generateFullWatchDmChannelsRequest(broker *globalMetaBroker, request *querypb.WatchDmChannelsRequest) (*querypb.WatchDmChannelsRequest, error) {
cloned := proto.Clone(request).(*querypb.WatchDmChannelsRequest) cloned := proto.Clone(request).(*querypb.WatchDmChannelsRequest)
vChannels := cloned.GetInfos() vChannels := cloned.GetInfos()
...@@ -36,7 +36,7 @@ func generateFullWatchDmChannelsRequest(m Meta, request *querypb.WatchDmChannels ...@@ -36,7 +36,7 @@ func generateFullWatchDmChannelsRequest(m Meta, request *querypb.WatchDmChannels
segmentIds = append(segmentIds, vChannel.UnflushedSegmentIds...) segmentIds = append(segmentIds, vChannel.UnflushedSegmentIds...)
segmentIds = append(segmentIds, vChannel.DroppedSegmentIds...) segmentIds = append(segmentIds, vChannel.DroppedSegmentIds...)
} }
segmentInfos, err := m.getDataSegmentInfosByIDs(segmentIds) segmentInfos, err := broker.getDataSegmentInfosByIDs(segmentIds)
if err != nil { if err != nil {
log.Error("Get Vchannel SegmentInfos failed", zap.Error(err)) log.Error("Get Vchannel SegmentInfos failed", zap.Error(err))
return nil, err return nil, err
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package querycoord package querycoord
import ( import (
"context"
"testing" "testing"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
...@@ -27,9 +28,9 @@ import ( ...@@ -27,9 +28,9 @@ import (
func TestGenerateFullWatchDmChannelsRequest(t *testing.T) { func TestGenerateFullWatchDmChannelsRequest(t *testing.T) {
dataCoord := &dataCoordMock{} dataCoord := &dataCoordMock{}
meta := &MetaReplica{ ctx, cancel := context.WithCancel(context.Background())
dataCoord: dataCoord, handler, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil, nil)
} assert.Nil(t, err)
deltaChannel := &datapb.VchannelInfo{ deltaChannel := &datapb.VchannelInfo{
CollectionID: defaultCollectionID, CollectionID: defaultCollectionID,
...@@ -45,14 +46,16 @@ func TestGenerateFullWatchDmChannelsRequest(t *testing.T) { ...@@ -45,14 +46,16 @@ func TestGenerateFullWatchDmChannelsRequest(t *testing.T) {
NodeID: 1, NodeID: 1,
} }
fullWatchDmChannelsRequest, err := generateFullWatchDmChannelsRequest(meta, watchDmChannelsRequest) fullWatchDmChannelsRequest, err := generateFullWatchDmChannelsRequest(handler, watchDmChannelsRequest)
assert.Nil(t, err) assert.Nil(t, err)
assert.NotEmpty(t, fullWatchDmChannelsRequest.GetSegmentInfos()) assert.NotEmpty(t, fullWatchDmChannelsRequest.GetSegmentInfos())
dataCoord.returnError = true dataCoord.returnError = true
fullWatchDmChannelsRequest2, err := generateFullWatchDmChannelsRequest(meta, watchDmChannelsRequest) fullWatchDmChannelsRequest2, err := generateFullWatchDmChannelsRequest(handler, watchDmChannelsRequest)
assert.Error(t, err) assert.Error(t, err)
assert.Empty(t, fullWatchDmChannelsRequest2.GetSegmentInfos()) assert.Empty(t, fullWatchDmChannelsRequest2.GetSegmentInfos())
cancel()
} }
func TestThinWatchDmChannelsRequest(t *testing.T) { func TestThinWatchDmChannelsRequest(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册