未验证 提交 921ef0f2 编写于 作者: B bigsheeper 提交者: GitHub

Fix seek query channel error (#10723)

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 87042932
......@@ -116,6 +116,11 @@ test-proxy:
go test -race -coverpkg=./... -coverprofile=profile.out -covermode=atomic -timeout 5m github.com/milvus-io/milvus/internal/proxy -v
test-querycoord:
@echo "Running go unittests..."
go test -race -coverpkg=./... -coverprofile=profile.out -covermode=atomic -timeout 5m github.com/milvus-io/milvus/internal/querycoord -v
test-go: build-cpp-with-unittest
@echo "Running go unittests..."
@(env bash $(PWD)/scripts/run_go_codecov.sh)
......
......@@ -37,6 +37,7 @@ const (
segmentMetaPrefix = "queryCoord-segmentMeta"
queryChannelMetaPrefix = "queryCoord-queryChannel"
sealedSegmentChangeInfoPrefix = "queryCoord-sealedSegmentChangeInfo"
globalQuerySeekPositionPrefix = "queryCoord-globalQuerySeekPosition"
)
type col2SegmentInfos = map[UniqueID][]*querypb.SegmentInfo
......@@ -100,6 +101,7 @@ type MetaReplica struct {
queryStreams map[UniqueID]msgstream.MsgStream
streamMu sync.RWMutex
globalSeekPosition *internalpb.MsgPosition
//partitionStates map[UniqueID]*querypb.PartitionStates
}
......@@ -109,6 +111,7 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll
segmentInfos := make(map[UniqueID]*querypb.SegmentInfo)
queryChannelInfos := make(map[UniqueID]*querypb.QueryChannelInfo)
queryMsgStream := make(map[UniqueID]msgstream.MsgStream)
position := &internalpb.MsgPosition{}
m := &MetaReplica{
ctx: childCtx,
......@@ -117,10 +120,11 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll
msFactory: factory,
idAllocator: idAllocator,
collectionInfos: collectionInfos,
segmentInfos: segmentInfos,
queryChannelInfos: queryChannelInfos,
queryStreams: queryMsgStream,
collectionInfos: collectionInfos,
segmentInfos: segmentInfos,
queryChannelInfos: queryChannelInfos,
queryStreams: queryMsgStream,
globalSeekPosition: position,
}
err := m.reloadFromKV()
......@@ -182,6 +186,15 @@ func (m *MetaReplica) reloadFromKV() error {
}
m.queryChannelInfos[collectionID] = queryChannelInfo
}
globalSeekPosValue, err := m.client.Load(globalQuerySeekPositionPrefix)
if err == nil {
position := &internalpb.MsgPosition{}
err = proto.Unmarshal([]byte(globalSeekPosValue), position)
if err != nil {
return err
}
m.globalSeekPosition = position
}
//TODO::update partition states
return nil
......@@ -399,6 +412,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
}
queryChannelInfosMap := make(map[UniqueID]*querypb.QueryChannelInfo)
var globalSeekPositionTmp *internalpb.MsgPosition
for collectionID, segmentChangeInfos := range col2SegmentChangeInfos {
// get msgStream to produce sealedSegmentChangeInfos to query channel
queryChannelInfo, messageIDInfos, err := m.sendSealedSegmentChangeInfos(collectionID, segmentChangeInfos)
......@@ -438,6 +452,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
}
queryChannelInfo.GlobalSealedSegments = globalSealedSegmentInfos
queryChannelInfosMap[collectionID] = queryChannelInfo
globalSeekPositionTmp = queryChannelInfo.SeekPosition
}
saveKvs := make(map[string]string)
......@@ -460,6 +475,11 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
channelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID)
saveKvs[channelKey] = string(channelInfoBytes)
}
seekPos, err := proto.Marshal(globalSeekPositionTmp)
if err != nil {
return col2SegmentChangeInfos, err
}
saveKvs[globalQuerySeekPositionPrefix] = string(seekPos)
// save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd
// avoid the produce process success but save meta to etcd failed
......@@ -474,7 +494,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
saveKvs[changeInfoKey] = string(changeInfoBytes)
}
err := m.client.MultiSave(saveKvs)
err = m.client.MultiSave(saveKvs)
if err != nil {
log.Error("updateGlobalSealedSegmentInfos: save info to etcd error", zap.Error(err))
return col2SegmentChangeInfos, err
......@@ -492,6 +512,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
for collectionID, channelInfo := range queryChannelInfosMap {
m.queryChannelInfos[collectionID] = channelInfo
}
m.globalSeekPosition = globalSeekPositionTmp
m.channelMu.Unlock()
return col2SegmentChangeInfos, nil
......@@ -563,6 +584,11 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
}
channelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID)
saveKvs[channelKey] = string(channelInfoBytes)
seekPos, err := proto.Marshal(queryChannelInfo.SeekPosition)
if err != nil {
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err
}
saveKvs[globalQuerySeekPositionPrefix] = string(seekPos)
// save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd
// avoid the produce process success but save meta to etcd failed
......@@ -594,6 +620,7 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
m.channelMu.Lock()
m.queryChannelInfos[collectionID] = queryChannelInfo
m.globalSeekPosition = queryChannelInfo.SeekPosition
m.channelMu.Unlock()
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, nil
......@@ -890,6 +917,10 @@ func (m *MetaReplica) getQueryChannelInfoByID(collectionID UniqueID) (*querypb.Q
// set info.collectionID from 0 to realID
info.CollectionID = collectionID
m.queryChannelInfos[collectionID] = info
info.SeekPosition = m.globalSeekPosition
if info.SeekPosition != nil {
info.SeekPosition.ChannelName = info.QueryChannelID
}
return proto.Clone(info).(*querypb.QueryChannelInfo), nil
}
......
......@@ -23,6 +23,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
......@@ -50,6 +51,10 @@ func (tk *testKv) LoadWithPrefix(key string) ([]string, []string, error) {
return nil, nil, nil
}
func (tk *testKv) Load(key string) (string, error) {
return "", nil
}
func TestReplica_Release(t *testing.T) {
refreshParams()
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
......@@ -85,10 +90,11 @@ func TestMetaFunc(t *testing.T) {
kv, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
assert.Nil(t, err)
meta := &MetaReplica{
client: kv,
collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
segmentInfos: map[UniqueID]*querypb.SegmentInfo{},
queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{},
client: kv,
collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
segmentInfos: map[UniqueID]*querypb.SegmentInfo{},
queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{},
globalSeekPosition: &internalpb.MsgPosition{},
}
nodeID := int64(100)
......@@ -342,4 +348,20 @@ func TestReloadMetaFromKV(t *testing.T) {
assert.Equal(t, true, ok)
_, ok = meta.queryChannelInfos[defaultCollectionID]
assert.Equal(t, true, ok)
t.Run("test no global query seek position", func(t *testing.T) {
err = kv.Remove(globalQuerySeekPositionPrefix)
assert.NoError(t, err)
err = meta.reloadFromKV()
assert.NoError(t, err)
})
t.Run("test wrong global query seek position", func(t *testing.T) {
err = kv.Save(globalQuerySeekPositionPrefix, "&%*&^*^(&%*&%&^%")
assert.NoError(t, err)
err = meta.reloadFromKV()
assert.Error(t, err)
})
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册