未验证 提交 2e676a4b 编写于 作者: B Bingyi Sun 提交者: GitHub

Extract segments info into a new struct. (#15537)

Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
Co-authored-by: Nsunby <bingyi.sun@zilliz.com>
上级 5751759c
......@@ -102,8 +102,6 @@ type MetaReplica struct {
//sync.RWMutex
collectionInfos map[UniqueID]*querypb.CollectionInfo
collectionMu sync.RWMutex
segmentInfos map[UniqueID]*querypb.SegmentInfo
segmentMu sync.RWMutex
queryChannelInfos map[UniqueID]*querypb.QueryChannelInfo
channelMu sync.RWMutex
deltaChannelInfos map[UniqueID][]*datapb.VchannelInfo
......@@ -113,13 +111,13 @@ type MetaReplica struct {
queryStreams map[UniqueID]msgstream.MsgStream
streamMu sync.RWMutex
segmentsInfo *segmentsInfo
//partitionStates map[UniqueID]*querypb.PartitionStates
}
func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAllocator func() (UniqueID, error)) (Meta, error) {
childCtx, cancel := context.WithCancel(ctx)
collectionInfos := make(map[UniqueID]*querypb.CollectionInfo)
segmentInfos := make(map[UniqueID]*querypb.SegmentInfo)
queryChannelInfos := make(map[UniqueID]*querypb.QueryChannelInfo)
deltaChannelInfos := make(map[UniqueID][]*datapb.VchannelInfo)
dmChannelInfos := make(map[string]*querypb.DmChannelWatchInfo)
......@@ -133,11 +131,12 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll
idAllocator: idAllocator,
collectionInfos: collectionInfos,
segmentInfos: segmentInfos,
queryChannelInfos: queryChannelInfos,
deltaChannelInfos: deltaChannelInfos,
dmChannelInfos: dmChannelInfos,
queryStreams: queryMsgStream,
segmentsInfo: newSegmentsInfo(kv),
}
err := m.reloadFromKV()
......@@ -167,22 +166,9 @@ func (m *MetaReplica) reloadFromKV() error {
m.collectionInfos[collectionID] = collectionInfo
}
segmentKeys, segmentValues, err := m.client.LoadWithPrefix(util.SegmentMetaPrefix)
if err != nil {
if err := m.segmentsInfo.loadSegments(); err != nil {
return err
}
for index := range segmentKeys {
segmentID, err := strconv.ParseInt(filepath.Base(segmentKeys[index]), 10, 64)
if err != nil {
return err
}
segmentInfo := &querypb.SegmentInfo{}
err = proto.Unmarshal([]byte(segmentValues[index]), segmentInfo)
if err != nil {
return err
}
m.segmentInfos[segmentID] = segmentInfo
}
deltaChannelKeys, deltaChannelValues, err := m.client.LoadWithPrefix(deltaChannelMetaPrefix)
if err != nil {
......@@ -522,29 +508,17 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
}
// save segmentInfo to etcd
segmentInfoKvs := make(map[string]string)
for _, infos := range saves {
for _, info := range infos {
segmentInfoBytes, err := proto.Marshal(info)
if err != nil {
return col2SegmentChangeInfos, err
if err := m.segmentsInfo.saveSegment(info); err != nil {
panic(err)
}
segmentKey := fmt.Sprintf("%s/%d/%d/%d", util.SegmentMetaPrefix, info.CollectionID, info.PartitionID, info.SegmentID)
segmentInfoKvs[segmentKey] = string(segmentInfoBytes)
}
}
for key, value := range segmentInfoKvs {
err := m.client.Save(key, value)
if err != nil {
panic(err)
}
}
// remove compacted segment info from etcd
for _, segmentInfo := range segmentsCompactionFrom {
segmentKey := fmt.Sprintf("%s/%d/%d/%d", util.SegmentMetaPrefix, segmentInfo.CollectionID, segmentInfo.PartitionID, segmentInfo.SegmentID)
err := m.client.Remove(segmentKey)
if err != nil {
if err := m.segmentsInfo.removeSegment(segmentInfo); err != nil {
panic(err)
}
}
......@@ -569,18 +543,6 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
panic(err)
}
m.segmentMu.Lock()
for _, segmentInfos := range saves {
for _, info := range segmentInfos {
segmentID := info.SegmentID
m.segmentInfos[segmentID] = info
}
}
for _, segmentInfo := range segmentsCompactionFrom {
delete(m.segmentInfos, segmentInfo.SegmentID)
}
m.segmentMu.Unlock()
m.channelMu.Lock()
for collectionID, channelInfo := range queryChannelInfosMap {
m.queryChannelInfos[collectionID] = channelInfo
......@@ -640,9 +602,7 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
// remove meta from etcd
for _, info := range removes {
segmentKey := fmt.Sprintf("%s/%d/%d/%d", util.SegmentMetaPrefix, info.CollectionID, info.PartitionID, info.SegmentID)
err = m.client.Remove(segmentKey)
if err != nil {
if err = m.segmentsInfo.removeSegment(info); err != nil {
panic(err)
}
}
......@@ -664,12 +624,6 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
panic(err)
}
m.segmentMu.Lock()
for _, info := range removes {
delete(m.segmentInfos, info.SegmentID)
}
m.segmentMu.Unlock()
m.channelMu.Lock()
m.queryChannelInfos[collectionID] = queryChannelInfo
m.channelMu.Unlock()
......@@ -727,52 +681,41 @@ func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, queryC
}
func (m *MetaReplica) showSegmentInfos(collectionID UniqueID, partitionIDs []UniqueID) []*querypb.SegmentInfo {
m.segmentMu.RLock()
defer m.segmentMu.RUnlock()
results := make([]*querypb.SegmentInfo, 0)
segmentInfos := make([]*querypb.SegmentInfo, 0)
for _, info := range m.segmentInfos {
if info.CollectionID == collectionID {
segmentInfos = append(segmentInfos, proto.Clone(info).(*querypb.SegmentInfo))
ignorePartitionCmp := len(partitionIDs) == 0
partitionFilter := make(map[int64]struct{})
for _, pid := range partitionIDs {
partitionFilter[pid] = struct{}{}
}
segments := m.segmentsInfo.getSegments()
var res []*querypb.SegmentInfo
for _, segment := range segments {
_, ok := partitionFilter[segment.GetPartitionID()]
if (ignorePartitionCmp || ok) && segment.GetCollectionID() == collectionID {
res = append(res, segment)
}
}
if len(partitionIDs) == 0 {
return segmentInfos
}
partitionIDMap := getCompareMapFromSlice(partitionIDs)
for _, info := range segmentInfos {
partitionID := info.PartitionID
if _, ok := partitionIDMap[partitionID]; ok {
results = append(results, info)
}
}
return results
return res
}
func (m *MetaReplica) getSegmentInfoByID(segmentID UniqueID) (*querypb.SegmentInfo, error) {
m.segmentMu.RLock()
defer m.segmentMu.RUnlock()
if info, ok := m.segmentInfos[segmentID]; ok {
return proto.Clone(info).(*querypb.SegmentInfo), nil
segment := m.segmentsInfo.getSegment(segmentID)
if segment != nil {
return segment, nil
}
return nil, errors.New("getSegmentInfoByID: can't find segmentID in segmentInfos")
}
func (m *MetaReplica) getSegmentInfosByNode(nodeID int64) []*querypb.SegmentInfo {
m.segmentMu.RLock()
defer m.segmentMu.RUnlock()
segmentInfos := make([]*querypb.SegmentInfo, 0)
for _, info := range m.segmentInfos {
if info.NodeID == nodeID {
segmentInfos = append(segmentInfos, proto.Clone(info).(*querypb.SegmentInfo))
func (m *MetaReplica) getSegmentInfosByNode(nodeID int64) []*querypb.SegmentInfo {
var res []*querypb.SegmentInfo
segments := m.segmentsInfo.getSegments()
for _, segment := range segments {
if segment.GetNodeID() == nodeID {
res = append(res, segment)
}
}
return segmentInfos
return res
}
func (m *MetaReplica) getCollectionInfoByID(collectionID UniqueID) (*querypb.CollectionInfo, error) {
......
......@@ -102,8 +102,8 @@ func TestMetaFunc(t *testing.T) {
kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
nodeID := defaultQueryNodeID
segmentInfos := make(map[UniqueID]*querypb.SegmentInfo)
segmentInfos[defaultSegmentID] = &querypb.SegmentInfo{
segmentsInfo := newSegmentsInfo(kv)
segmentsInfo.segmentIDMap[defaultSegmentID] = &querypb.SegmentInfo{
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
SegmentID: defaultSegmentID,
......@@ -112,9 +112,9 @@ func TestMetaFunc(t *testing.T) {
meta := &MetaReplica{
client: kv,
collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
segmentInfos: segmentInfos,
queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{},
dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{},
segmentsInfo: segmentsInfo,
}
dmChannels := []string{"testDm1", "testDm2"}
......@@ -297,10 +297,10 @@ func TestReloadMetaFromKV(t *testing.T) {
meta := &MetaReplica{
client: kv,
collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
segmentInfos: map[UniqueID]*querypb.SegmentInfo{},
queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{},
dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{},
deltaChannelInfos: map[UniqueID][]*datapb.VchannelInfo{},
segmentsInfo: newSegmentsInfo(kv),
}
kvs := make(map[string]string)
......@@ -348,9 +348,9 @@ func TestReloadMetaFromKV(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, 1, len(meta.collectionInfos))
assert.Equal(t, 1, len(meta.segmentInfos))
assert.Equal(t, 1, len(meta.segmentsInfo.getSegments()))
_, ok := meta.collectionInfos[defaultCollectionID]
assert.Equal(t, true, ok)
_, ok = meta.segmentInfos[defaultSegmentID]
assert.Equal(t, true, ok)
segment := meta.segmentsInfo.getSegment(defaultSegmentID)
assert.NotNil(t, segment)
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querycoord
import (
"fmt"
"sync"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util"
)
// segmentsInfo provides interfaces to do persistence/retrieve for segments with an in-memory cache
type segmentsInfo struct {
mu sync.RWMutex
loadOnce sync.Once
segmentIDMap map[int64]*querypb.SegmentInfo
kv kv.TxnKV
}
func newSegmentsInfo(kv kv.TxnKV) *segmentsInfo {
return &segmentsInfo{
kv: kv,
segmentIDMap: make(map[int64]*querypb.SegmentInfo),
}
}
func (s *segmentsInfo) loadSegments() error {
var err error
s.loadOnce.Do(func() {
s.mu.Lock()
defer s.mu.Unlock()
var values []string
_, values, err = s.kv.LoadWithPrefix(util.SegmentMetaPrefix)
if err != nil {
return
}
for _, v := range values {
segment := &querypb.SegmentInfo{}
if err = proto.Unmarshal([]byte(v), segment); err != nil {
return
}
s.segmentIDMap[segment.GetSegmentID()] = segment
}
})
return err
}
func (s *segmentsInfo) saveSegment(segment *querypb.SegmentInfo) error {
s.mu.Lock()
defer s.mu.Unlock()
k := getSegmentKey(segment)
v, err := proto.Marshal(segment)
if err != nil {
return err
}
if err = s.kv.Save(k, string(v)); err != nil {
return err
}
s.segmentIDMap[segment.GetSegmentID()] = segment
return nil
}
func (s *segmentsInfo) removeSegment(segment *querypb.SegmentInfo) error {
s.mu.Lock()
defer s.mu.Unlock()
k := getSegmentKey(segment)
if err := s.kv.Remove(k); err != nil {
return err
}
delete(s.segmentIDMap, segment.GetSegmentID())
return nil
}
func (s *segmentsInfo) getSegment(ID int64) *querypb.SegmentInfo {
s.mu.RLock()
defer s.mu.RUnlock()
return s.segmentIDMap[ID]
}
func (s *segmentsInfo) getSegments() []*querypb.SegmentInfo {
s.mu.RLock()
defer s.mu.RUnlock()
res := make([]*querypb.SegmentInfo, 0, len(s.segmentIDMap))
for _, segment := range s.segmentIDMap {
res = append(res, segment)
}
return res
}
func getSegmentKey(segment *querypb.SegmentInfo) string {
return fmt.Sprintf("%s/%d/%d/%d", util.SegmentMetaPrefix, segment.GetCollectionID(), segment.GetPartitionID(),
segment.GetSegmentID())
}
package querycoord
import (
"testing"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/stretchr/testify/assert"
)
func Test_segmentsInfo_getSegment(t *testing.T) {
s := newSegmentsInfo(createTestKv(t))
assert.Nil(t, s.loadSegments())
got := s.getSegment(1)
assert.EqualValues(t, 1, got.GetSegmentID())
got = s.getSegment(2)
assert.EqualValues(t, 2, got.GetSegmentID())
segment := &querypb.SegmentInfo{SegmentID: 3, CollectionID: 3}
assert.Nil(t, s.saveSegment(segment))
got = s.getSegment(3)
assert.NotNil(t, got)
assert.True(t, proto.Equal(segment, got))
assert.Nil(t, s.removeSegment(segment))
got = s.getSegment(3)
assert.Nil(t, got)
}
func Test_segmentsInfo_getSegments(t *testing.T) {
s := newSegmentsInfo(createTestKv(t))
assert.Nil(t, s.loadSegments())
got := s.getSegments()
assert.ElementsMatch(t, []int64{1, 2}, collectSegmentIDs(got))
segment := &querypb.SegmentInfo{SegmentID: 3, CollectionID: 3}
assert.Nil(t, s.saveSegment(segment))
got = s.getSegments()
assert.ElementsMatch(t, []int64{1, 2, 3}, collectSegmentIDs(got))
assert.Nil(t, s.saveSegment(segment))
got = s.getSegments()
assert.ElementsMatch(t, []int64{1, 2, 3}, collectSegmentIDs(got))
assert.Nil(t, s.removeSegment(segment))
got = s.getSegments()
assert.ElementsMatch(t, []int64{1, 2}, collectSegmentIDs(got))
}
func createTestKv(t *testing.T) kv.TxnKV {
kv := memkv.NewMemoryKV()
segments := []*querypb.SegmentInfo{
{SegmentID: 1, CollectionID: 1},
{SegmentID: 2, CollectionID: 2},
}
for _, segment := range segments {
k := getSegmentKey(segment)
v, err := proto.Marshal(segment)
assert.Nil(t, err)
assert.Nil(t, kv.Save(k, string(v)))
}
return kv
}
func collectSegmentIDs(segments []*querypb.SegmentInfo) []int64 {
res := make([]int64, 0, len(segments))
for _, s := range segments {
res = append(res, s.GetSegmentID())
}
return res
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册