未验证 提交 70254c3e 编写于 作者: C codeman 提交者: GitHub

Unified catalog interface for segment (#18289) (#18290)

Signed-off-by: Nkejiang <ke.jiang@zilliz.com>
Signed-off-by: Nkejiang <ke.jiang@zilliz.com>
Co-authored-by: Nkejiang <ke.jiang@zilliz.com>
上级 fc42ee5b
......@@ -22,6 +22,7 @@ import (
"time"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/stretchr/testify/assert"
......@@ -153,7 +154,7 @@ func Test_compactionPlanHandler_completeCompaction(t *testing.T) {
},
nil,
&meta{
client: memkv.NewMemoryKV(),
catalog: &datacoord.Catalog{Txn: memkv.NewMemoryKV()},
segments: &SegmentsInfo{
map[int64]*SegmentInfo{
1: {SegmentInfo: &datapb.SegmentInfo{ID: 1, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1")}}},
......@@ -202,7 +203,7 @@ func Test_compactionPlanHandler_completeCompaction(t *testing.T) {
},
nil,
&meta{
client: memkv.NewMemoryKV(),
catalog: &datacoord.Catalog{Txn: memkv.NewMemoryKV()},
segments: &SegmentsInfo{
map[int64]*SegmentInfo{
......@@ -289,7 +290,7 @@ func Test_compactionPlanHandler_segment_is_referenced(t *testing.T) {
},
nil,
&meta{
client: memkv.NewMemoryKV(),
catalog: &datacoord.Catalog{Txn: memkv.NewMemoryKV()},
segments: &SegmentsInfo{
map[int64]*SegmentInfo{
......
......@@ -166,11 +166,11 @@ func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
}
}
return false*/
return h.s.meta.ChannelHasRemoveFlag(channel)
return h.s.meta.catalog.IsChannelDropped(h.s.ctx, channel)
}
// FinishDropChannel cleans up the remove flag for channels
// this function is a wrapper of server.meta.FinishDropChannel
func (h *ServerHandler) FinishDropChannel(channel string) {
h.s.meta.FinishRemoveChannel(channel)
h.s.meta.catalog.DropChannel(h.s.ctx, channel)
}
......@@ -18,41 +18,35 @@
package datacoord
import (
"context"
"fmt"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util"
"go.uber.org/zap"
)
const (
metaPrefix = "datacoord-meta"
segmentPrefix = metaPrefix + "/s"
channelRemovePrefix = metaPrefix + "/channel-removal"
removeFlagTomestone = "removed"
)
type meta struct {
sync.RWMutex
client kv.TxnKV // client of a reliable kv service, i.e. etcd client
ctx context.Context
catalog metastore.DataCoordCatalog
collections map[UniqueID]*datapb.CollectionInfo // collection id to collection info
segments *SegmentsInfo // segment id to segment info
}
// NewMeta creates meta from provided `kv.TxnKV`
func newMeta(kv kv.TxnKV) (*meta, error) {
func newMeta(ctx context.Context, kv kv.TxnKV) (*meta, error) {
mt := &meta{
client: kv,
ctx: ctx,
catalog: &datacoord.Catalog{Txn: kv},
collections: make(map[UniqueID]*datapb.CollectionInfo),
segments: NewSegmentsInfo(),
}
......@@ -65,7 +59,7 @@ func newMeta(kv kv.TxnKV) (*meta, error) {
// reloadFromKV loads meta from KV storage
func (m *meta) reloadFromKV() error {
_, values, err := m.client.LoadWithPrefix(segmentPrefix)
segments, err := m.catalog.ListSegments(m.ctx)
if err != nil {
return err
}
......@@ -77,17 +71,11 @@ func (m *meta) reloadFromKV() error {
metrics.DataCoordNumSegments.WithLabelValues(metrics.DropedSegmentLabel).Set(0)
metrics.DataCoordNumStoredRows.WithLabelValues().Set(0)
numStoredRows := int64(0)
for _, value := range values {
segmentInfo := &datapb.SegmentInfo{}
err = proto.Unmarshal([]byte(value), segmentInfo)
if err != nil {
return fmt.Errorf("DataCoord reloadFromKV UnMarshal datapb.SegmentInfo err:%w", err)
}
state := segmentInfo.GetState()
m.segments.SetSegment(segmentInfo.GetID(), NewSegmentInfo(segmentInfo))
metrics.DataCoordNumSegments.WithLabelValues(state.String()).Inc()
if state == commonpb.SegmentState_Flushed {
numStoredRows += segmentInfo.GetNumOfRows()
for _, segment := range segments {
m.segments.SetSegment(segment.ID, NewSegmentInfo(segment))
metrics.DataCoordNumSegments.WithLabelValues(segment.State.String()).Inc()
if segment.State == commonpb.SegmentState_Flushed {
numStoredRows += segment.NumOfRows
}
}
metrics.DataCoordNumStoredRows.WithLabelValues().Set(float64(numStoredRows))
......@@ -171,7 +159,7 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
func (m *meta) AddSegment(segment *SegmentInfo) error {
m.Lock()
defer m.Unlock()
if err := m.saveSegmentInfo(segment); err != nil {
if err := m.catalog.AddSegment(m.ctx, segment.SegmentInfo); err != nil {
return err
}
m.segments.SetSegment(segment.GetID(), segment)
......@@ -187,7 +175,7 @@ func (m *meta) DropSegment(segmentID UniqueID) error {
if segment == nil {
return nil
}
if err := m.removeSegmentInfo(segment); err != nil {
if err := m.catalog.DropSegment(m.ctx, segment.SegmentInfo); err != nil {
return err
}
metrics.DataCoordNumSegments.WithLabelValues(metrics.DropedSegmentLabel).Inc()
......@@ -231,7 +219,7 @@ func (m *meta) SetState(segmentID UniqueID, state commonpb.SegmentState) error {
m.segments.SetState(segmentID, state)
curSegInfo = m.segments.GetSegment(segmentID)
if curSegInfo != nil && isSegmentHealthy(curSegInfo) {
err := m.saveSegmentInfo(curSegInfo)
err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{curSegInfo.SegmentInfo})
if err == nil {
metrics.DataCoordNumSegments.WithLabelValues(oldState.String()).Dec()
metrics.DataCoordNumSegments.WithLabelValues(state.String()).Inc()
......@@ -377,21 +365,11 @@ func (m *meta) UpdateFlushSegmentsInfo(
modSegments[cp.GetSegmentID()] = s
}
kv := make(map[string]string)
for _, segment := range modSegments {
segBytes, err := proto.Marshal(segment.SegmentInfo)
if err != nil {
return fmt.Errorf("dataCoord UpdateFlushSegmentsInfo segmentID:%d, marshal failed:%w", segment.GetID(), err)
}
key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
kv[key] = string(segBytes)
}
if len(kv) == 0 {
return nil
segments := make([]*datapb.SegmentInfo, 0, len(modSegments))
for _, seg := range modSegments {
segments = append(segments, seg.SegmentInfo)
}
if err := m.saveKvTxn(kv); err != nil {
if err := m.catalog.AlterSegments(m.ctx, segments); err != nil {
log.Error("failed to store flush segment info into Etcd", zap.Error(err))
return err
}
......@@ -422,6 +400,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
modSegments := make(map[UniqueID]*SegmentInfo)
originSegments := make(map[UniqueID]*SegmentInfo)
// save new segments flushed from buffer data
for _, seg2Drop := range segments {
segment := m.mergeDropSegment(seg2Drop)
if segment != nil {
......@@ -429,7 +408,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
modSegments[seg2Drop.GetID()] = segment
}
}
// set all channels of channel to dropped
// set existed segments of channel to Dropped
for _, seg := range m.segments.segments {
if seg.InsertChannel != channel {
continue
......@@ -447,8 +426,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
if err == nil {
for _, seg := range originSegments {
state := seg.GetState()
metrics.DataCoordNumSegments.WithLabelValues(
state.String()).Dec()
metrics.DataCoordNumSegments.WithLabelValues(state.String()).Dec()
if state == commonpb.SegmentState_Flushed {
metrics.DataCoordNumStoredRows.WithLabelValues().Sub(float64(seg.GetNumOfRows()))
}
......@@ -540,60 +518,33 @@ func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*Segm
return m.saveDropSegmentAndRemove(channel, modSegments, true)
}
func (m *meta) saveDropSegmentAndRemove(channel string, modSegments map[int64]*SegmentInfo, withFlag bool) error {
kv := make(map[string]string)
update := make([]*SegmentInfo, 0, maxOperationsPerTxn)
size := 0
for id, s := range modSegments {
key := buildSegmentPath(s.GetCollectionID(), s.GetPartitionID(), s.GetID())
delete(modSegments, id)
segBytes, err := proto.Marshal(s.SegmentInfo)
if err != nil {
return fmt.Errorf("DataCoord UpdateDropChannelSegmentInfo segmentID:%d, marshal failed:%w", s.GetID(), err)
}
kv[key] = string(segBytes)
update = append(update, s)
size += len(key) + len(segBytes)
if len(kv) == maxOperationsPerTxn || len(modSegments) == 1 || size >= maxBytesPerTxn {
break
}
}
if withFlag {
// add removal flag into meta, preventing non-atomic removal channel failure
removalFlag := buildChannelRemovePath(channel)
kv[removalFlag] = removeFlagTomestone
func (m *meta) saveDropSegmentAndRemove(channel string, segments map[int64]*SegmentInfo, withFlag bool) error {
segmentMap := make(map[int64]*datapb.SegmentInfo)
for id, seg := range segments {
segmentMap[id] = seg.SegmentInfo
}
err := m.saveKvTxn(kv)
// TODO: RootCoord supports read-write prohibit when dropping collection
// divides two api calls: save dropped segments & mark channel deleted
updateIDs, err := m.catalog.SaveDroppedSegmentsInBatch(m.ctx, segmentMap)
if err != nil {
log.Warn("Failed to txn save segment info batch for DropChannel", zap.Error(err))
return err
}
if withFlag {
err = m.catalog.MarkChannelDeleted(m.ctx, channel)
if err != nil {
return err
}
}
// update memory info
for _, s := range update {
m.segments.SetSegment(s.GetID(), s)
for _, id := range updateIDs {
m.segments.SetSegment(id, segments[id])
delete(segments, id)
}
metrics.DataCoordNumSegments.WithLabelValues(metrics.DropedSegmentLabel).Add(float64(len(update)))
return nil
}
metrics.DataCoordNumSegments.WithLabelValues(metrics.DropedSegmentLabel).Add(float64(len(updateIDs)))
// FinishRemoveChannel removes channel remove flag after whole procedure is finished
func (m *meta) FinishRemoveChannel(channel string) error {
key := buildChannelRemovePath(channel)
return m.client.Remove(key)
}
// ChannelHasRemoveFlag
func (m *meta) ChannelHasRemoveFlag(channel string) bool {
key := buildChannelRemovePath(channel)
v, err := m.client.Load(key)
if err != nil || v != removeFlagTomestone {
return false
}
return true
return nil
}
// ListSegmentFiles lists all segments' logs
......@@ -742,7 +693,8 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
defer m.Unlock()
m.segments.AddAllocation(segmentID, allocation)
if segInfo := m.segments.GetSegment(segmentID); segInfo != nil {
return m.saveSegmentInfo(segInfo)
// update segment LastExpireTime
return m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{segInfo.SegmentInfo})
}
return nil
}
......@@ -854,25 +806,12 @@ func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmen
zap.Int64("NumOfRows", segmentInfo.NumOfRows),
zap.Any("compactionFrom", segmentInfo.CompactionFrom))
data := make(map[string]string)
modSegments := make([]*datapb.SegmentInfo, 0, len(segments))
for _, s := range segments {
k, v, err := m.marshal(s)
if err != nil {
return err
}
data[k] = v
}
if segment.NumOfRows > 0 {
k, v, err := m.marshal(segment)
if err != nil {
return err
}
data[k] = v
modSegments = append(modSegments, s.SegmentInfo)
}
if err := m.saveKvTxn(data); err != nil {
if err := m.catalog.AlterSegmentsAndAddNewSegment(m.ctx, modSegments, segment.SegmentInfo); err != nil {
return err
}
......@@ -894,7 +833,7 @@ func (m *meta) CompleteInnerCompaction(segmentBinlogs *datapb.CompactionSegmentB
if segment := m.segments.GetSegment(segmentBinlogs.SegmentID); segment != nil {
// The compaction deletes the entire segment
if result.NumOfRows <= 0 {
err := m.removeSegmentInfo(segment)
err := m.catalog.DropSegment(m.ctx, segment.SegmentInfo)
if err != nil {
return err
}
......@@ -907,7 +846,7 @@ func (m *meta) CompleteInnerCompaction(segmentBinlogs *datapb.CompactionSegmentB
cloned.Binlogs = m.updateBinlogs(cloned.GetBinlogs(), segmentBinlogs.GetFieldBinlogs(), result.GetInsertLogs())
cloned.Statslogs = m.updateBinlogs(cloned.GetStatslogs(), segmentBinlogs.GetField2StatslogPaths(), result.GetField2StatslogPaths())
cloned.Deltalogs = m.updateDeltalogs(cloned.GetDeltalogs(), segmentBinlogs.GetDeltalogs(), result.GetDeltalogs())
if err := m.saveSegmentInfo(cloned); err != nil {
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo}); err != nil {
return err
}
......@@ -996,74 +935,6 @@ func (m *meta) updateDeltalogs(origin []*datapb.FieldBinlog, removes []*datapb.F
return res
}
func (m *meta) marshal(segment *SegmentInfo) (string, string, error) {
segBytes, err := proto.Marshal(segment.SegmentInfo)
if err != nil {
return "", "", fmt.Errorf("failed to marshal segment info, %v", err)
}
key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
return key, string(segBytes), nil
}
// saveSegmentInfo utility function saving segment info into kv store
func (m *meta) saveSegmentInfo(segment *SegmentInfo) error {
segBytes, err := proto.Marshal(segment.SegmentInfo)
if err != nil {
log.Error("DataCoord saveSegmentInfo marshal failed", zap.Int64("segmentID", segment.GetID()), zap.Error(err))
return fmt.Errorf("DataCoord saveSegmentInfo segmentID:%d, marshal failed:%w", segment.GetID(), err)
}
kvs := make(map[string]string)
dataKey := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
kvs[dataKey] = string(segBytes)
if segment.State == commonpb.SegmentState_Flushed {
handoffSegmentInfo := &querypb.SegmentInfo{
SegmentID: segment.ID,
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
DmChannel: segment.InsertChannel,
SegmentState: commonpb.SegmentState_Sealed,
CreatedByCompaction: segment.GetCreatedByCompaction(),
CompactionFrom: segment.GetCompactionFrom(),
}
handoffSegBytes, err := proto.Marshal(handoffSegmentInfo)
if err != nil {
log.Error("DataCoord saveSegmentInfo marshal handoffSegInfo failed", zap.Int64("segmentID", segment.GetID()), zap.Error(err))
return fmt.Errorf("DataCoord saveSegmentInfo segmentID:%d, marshal handoffSegInfo failed:%w", segment.GetID(), err)
}
queryKey := buildQuerySegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
kvs[queryKey] = string(handoffSegBytes)
}
return m.client.MultiSave(kvs)
}
// removeSegmentInfo utility function removing segment info from kv store
// Note that nil parameter will cause panicking
func (m *meta) removeSegmentInfo(segment *SegmentInfo) error {
key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
return m.client.Remove(key)
}
// saveKvTxn batch save kvs
func (m *meta) saveKvTxn(kv map[string]string) error {
return m.client.MultiSave(kv)
}
// buildSegmentPath common logic mapping segment info to corresponding key in kv store
func buildSegmentPath(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d", segmentPrefix, collectionID, partitionID, segmentID)
}
// buildQuerySegmentPath common logic mapping segment info to corresponding key of queryCoord in kv store
func buildQuerySegmentPath(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d", util.HandoffSegmentPrefix, collectionID, partitionID, segmentID)
}
// buildChannelRemovePat builds vchannel remove flag path
func buildChannelRemovePath(channel string) string {
return fmt.Sprintf("%s/%s", channelRemovePrefix, channel)
}
// buildSegment utility function for compose datapb.SegmentInfo struct with provided info
func buildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) *SegmentInfo {
info := &datapb.SegmentInfo{
......
......@@ -18,13 +18,16 @@ package datacoord
import (
"context"
"fmt"
"path/filepath"
"strconv"
"strings"
"testing"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
......@@ -32,6 +35,203 @@ import (
"github.com/stretchr/testify/assert"
)
type mockEtcdKv struct {
kv.TxnKV
}
func (mek *mockEtcdKv) LoadWithPrefix(key string) ([]string, []string, error) {
var val []byte
switch {
case strings.Contains(key, datacoord.SegmentPrefix):
segInfo := &datapb.SegmentInfo{ID: 1, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1")}}
val, _ = proto.Marshal(segInfo)
case strings.Contains(key, datacoord.SegmentBinlogPathPrefix):
segInfo := getFieldBinlogPaths(1, "binlog1")
val, _ = proto.Marshal(segInfo)
case strings.Contains(key, datacoord.SegmentDeltalogPathPrefix):
segInfo := getFieldBinlogPaths(1, "deltalog1")
val, _ = proto.Marshal(segInfo)
case strings.Contains(key, datacoord.SegmentStatslogPathPrefix):
segInfo := getFieldBinlogPaths(1, "statslog1")
val, _ = proto.Marshal(segInfo)
default:
return nil, nil, fmt.Errorf("invalid key")
}
return nil, []string{string(val)}, nil
}
type mockKvLoadSegmentError struct {
kv.TxnKV
}
func (mek *mockKvLoadSegmentError) LoadWithPrefix(key string) ([]string, []string, error) {
if strings.Contains(key, datacoord.SegmentPrefix) {
return nil, nil, fmt.Errorf("segment LoadWithPrefix error")
}
return nil, nil, nil
}
type mockKvLoadBinlogError struct {
kv.TxnKV
}
func (mek *mockKvLoadBinlogError) LoadWithPrefix(key string) ([]string, []string, error) {
var val []byte
switch {
case strings.Contains(key, datacoord.SegmentPrefix):
segInfo := &datapb.SegmentInfo{ID: 1, Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "delta_log_1")}}
val, _ = proto.Marshal(segInfo)
case strings.Contains(key, datacoord.SegmentBinlogPathPrefix):
return nil, nil, fmt.Errorf("LoadWithPrefix for binlogs error")
}
return nil, []string{string(val)}, nil
}
type mockKvLoadDeltaBinlogError struct {
kv.TxnKV
}
func (mek *mockKvLoadDeltaBinlogError) LoadWithPrefix(key string) ([]string, []string, error) {
var val []byte
switch {
case strings.Contains(key, datacoord.SegmentPrefix):
segInfo := &datapb.SegmentInfo{ID: 1, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog_1")}}
val, _ = proto.Marshal(segInfo)
case strings.Contains(key, datacoord.SegmentDeltalogPathPrefix):
return nil, nil, fmt.Errorf("LoadWithPrefix for deltalog error")
}
return nil, []string{string(val)}, nil
}
type mockKvLoadStatsBinlogError struct {
kv.TxnKV
}
func (mek *mockKvLoadStatsBinlogError) LoadWithPrefix(key string) ([]string, []string, error) {
var val []byte
switch {
case strings.Contains(key, datacoord.SegmentPrefix+"/"):
segInfo := &datapb.SegmentInfo{ID: 1, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog_1")}}
val, _ = proto.Marshal(segInfo)
case strings.Contains(key, datacoord.SegmentStatslogPathPrefix):
return nil, nil, fmt.Errorf("LoadWithPrefix for statslog error")
}
return nil, []string{string(val)}, nil
}
type mockKvIllegalSegment struct {
kv.TxnKV
}
func (mek *mockKvIllegalSegment) LoadWithPrefix(key string) ([]string, []string, error) {
var val []byte
switch {
case strings.Contains(key, datacoord.SegmentPrefix):
val = []byte{'i', 'l', 'l', 'e', 'g', 'a', 'l'}
}
return nil, []string{string(val)}, nil
}
type mockKvIllegalBinlog struct {
kv.TxnKV
}
func (mek *mockKvIllegalBinlog) LoadWithPrefix(key string) ([]string, []string, error) {
var val []byte
switch {
case strings.Contains(key, datacoord.SegmentBinlogPathPrefix):
val = []byte{'i', 'l', 'l', 'e', 'g', 'a', 'l'}
}
return nil, []string{string(val)}, nil
}
type mockKvIllegalDeltalog struct {
kv.TxnKV
}
func (mek *mockKvIllegalDeltalog) LoadWithPrefix(key string) ([]string, []string, error) {
var val []byte
switch {
case strings.Contains(key, datacoord.SegmentDeltalogPathPrefix):
val = []byte{'i', 'l', 'l', 'e', 'g', 'a', 'l'}
}
return nil, []string{string(val)}, nil
}
type mockKvIllegalStatslog struct {
kv.TxnKV
}
func (mek *mockKvIllegalStatslog) LoadWithPrefix(key string) ([]string, []string, error) {
var val []byte
switch {
case strings.Contains(key, datacoord.SegmentStatslogPathPrefix):
val = []byte{'i', 'l', 'l', 'e', 'g', 'a', 'l'}
}
return nil, []string{string(val)}, nil
}
func TestMetaReloadFromKV(t *testing.T) {
t.Run("Test ReloadFromKV success", func(t *testing.T) {
fkv := &mockEtcdKv{}
_, err := newMeta(context.TODO(), fkv)
assert.Nil(t, err)
})
// load segment error
t.Run("Test ReloadFromKV load segment fails", func(t *testing.T) {
fkv := &mockKvLoadSegmentError{}
_, err := newMeta(context.TODO(), fkv)
assert.NotNil(t, err)
})
// illegal segment info
t.Run("Test ReloadFromKV unmarshal segment fails", func(t *testing.T) {
fkv := &mockKvIllegalSegment{}
_, err := newMeta(context.TODO(), fkv)
assert.NotNil(t, err)
})
// load binlog/deltalog/statslog error
t.Run("Test ReloadFromKV load binlog fails", func(t *testing.T) {
fkv := &mockKvLoadBinlogError{}
_, err := newMeta(context.TODO(), fkv)
assert.NotNil(t, err)
})
t.Run("Test ReloadFromKV load deltalog fails", func(t *testing.T) {
fkv := &mockKvLoadDeltaBinlogError{}
_, err := newMeta(context.TODO(), fkv)
assert.NotNil(t, err)
})
t.Run("Test ReloadFromKV load statslog fails", func(t *testing.T) {
fkv := &mockKvLoadStatsBinlogError{}
_, err := newMeta(context.TODO(), fkv)
assert.NotNil(t, err)
})
// illegal binlog/deltalog/statslog info
t.Run("Test ReloadFromKV unmarshal binlog fails", func(t *testing.T) {
fkv := &mockKvIllegalBinlog{}
_, err := newMeta(context.TODO(), fkv)
assert.NotNil(t, err)
})
t.Run("Test ReloadFromKV unmarshal deltalog fails", func(t *testing.T) {
fkv := &mockKvIllegalDeltalog{}
_, err := newMeta(context.TODO(), fkv)
assert.NotNil(t, err)
})
t.Run("Test ReloadFromKV unmarshal statslog fails", func(t *testing.T) {
fkv := &mockKvIllegalStatslog{}
_, err := newMeta(context.TODO(), fkv)
assert.NotNil(t, err)
})
}
func TestMeta_Basic(t *testing.T) {
const collID = UniqueID(0)
const partID0 = UniqueID(100)
......@@ -135,14 +335,14 @@ func TestMeta_Basic(t *testing.T) {
// inject error for `Save`
memoryKV := memkv.NewMemoryKV()
fkv := &saveFailKV{TxnKV: memoryKV}
meta, err := newMeta(fkv)
meta, err := newMeta(context.TODO(), fkv)
assert.Nil(t, err)
err = meta.AddSegment(NewSegmentInfo(&datapb.SegmentInfo{}))
assert.NotNil(t, err)
fkv2 := &removeFailKV{TxnKV: memoryKV}
meta, err = newMeta(fkv2)
meta, err = newMeta(context.TODO(), fkv2)
assert.Nil(t, err)
// nil, since no segment yet
err = meta.DropSegment(0)
......@@ -234,7 +434,7 @@ func TestGetUnFlushedSegments(t *testing.T) {
func TestUpdateFlushSegmentsInfo(t *testing.T) {
t.Run("normal", func(t *testing.T) {
meta, err := newMeta(memkv.NewMemoryKV())
meta, err := newMeta(context.TODO(), memkv.NewMemoryKV())
assert.Nil(t, err)
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog0")},
......@@ -260,7 +460,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
})
t.Run("update non-existed segment", func(t *testing.T) {
meta, err := newMeta(memkv.NewMemoryKV())
meta, err := newMeta(context.TODO(), memkv.NewMemoryKV())
assert.Nil(t, err)
err = meta.UpdateFlushSegmentsInfo(1, false, false, false, nil, nil, nil, nil, nil)
......@@ -268,7 +468,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
})
t.Run("update checkpoints and start position of non existed segment", func(t *testing.T) {
meta, err := newMeta(memkv.NewMemoryKV())
meta, err := newMeta(context.TODO(), memkv.NewMemoryKV())
assert.Nil(t, err)
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing}}
......@@ -285,7 +485,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
t.Run("test save etcd failed", func(t *testing.T) {
kv := memkv.NewMemoryKV()
failedKv := &saveFailKV{kv}
meta, err := newMeta(failedKv)
meta, err := newMeta(context.TODO(), failedKv)
assert.Nil(t, err)
segmentInfo := &SegmentInfo{
......@@ -312,7 +512,8 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
}
func TestSaveHandoffMeta(t *testing.T) {
meta, err := newMeta(memkv.NewMemoryKV())
kvClient := memkv.NewMemoryKV()
meta, err := newMeta(context.TODO(), kvClient)
assert.Nil(t, err)
info := &datapb.SegmentInfo{
......@@ -323,10 +524,10 @@ func TestSaveHandoffMeta(t *testing.T) {
SegmentInfo: info,
}
err = meta.saveSegmentInfo(segmentInfo)
err = meta.catalog.AddSegment(context.TODO(), segmentInfo.SegmentInfo)
assert.Nil(t, err)
keys, _, err := meta.client.LoadWithPrefix(util.HandoffSegmentPrefix)
keys, _, err := kvClient.LoadWithPrefix(util.HandoffSegmentPrefix)
assert.Nil(t, err)
assert.Equal(t, 1, len(keys))
segmentID, err := strconv.ParseInt(filepath.Base(keys[0]), 10, 64)
......@@ -444,7 +645,7 @@ func Test_meta_CompleteMergeCompaction(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &meta{
client: tt.fields.client,
catalog: &datacoord.Catalog{Txn: tt.fields.client},
collections: tt.fields.collections,
segments: tt.fields.segments,
}
......@@ -566,7 +767,7 @@ func Test_meta_CompleteInnerCompaction(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &meta{
client: tt.fields.client,
catalog: &datacoord.Catalog{Txn: tt.fields.client},
collections: tt.fields.collections,
segments: tt.fields.segments,
}
......@@ -620,7 +821,7 @@ func Test_meta_SetSegmentCompacting(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &meta{
client: tt.fields.client,
catalog: &datacoord.Catalog{Txn: tt.fields.client},
segments: tt.fields.segments,
}
m.SetSegmentCompacting(tt.args.segmentID, tt.args.compacting)
......
......@@ -40,7 +40,7 @@ import (
func newMemoryMeta(allocator allocator) (*meta, error) {
memoryKV := memkv.NewMemoryKV()
return newMeta(memoryKV)
return newMeta(context.TODO(), memoryKV)
}
var _ allocator = (*MockAllocator)(nil)
......@@ -97,7 +97,7 @@ func (kv *saveFailKV) MultiSave(kvs map[string]string) error {
type removeFailKV struct{ kv.TxnKV }
// Remove override behavior, inject error
func (kv *removeFailKV) Remove(key string) error {
func (kv *removeFailKV) MultiRemove(key []string) error {
return errors.New("mocked fail")
}
......
......@@ -24,6 +24,7 @@ import (
"time"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
......@@ -491,7 +492,7 @@ func TestTryToSealSegment(t *testing.T) {
mockAllocator := newMockAllocator()
memoryKV := memkv.NewMemoryKV()
fkv := &saveFailKV{TxnKV: memoryKV}
meta, err := newMeta(memoryKV)
meta, err := newMeta(context.TODO(), memoryKV)
assert.Nil(t, err)
......@@ -504,7 +505,7 @@ func TestTryToSealSegment(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
segmentManager.meta.client = fkv
segmentManager.meta.catalog = &datacoord.Catalog{Txn: fkv}
ts, err := segmentManager.allocator.allocTimestamp(context.Background())
assert.Nil(t, err)
......@@ -517,7 +518,7 @@ func TestTryToSealSegment(t *testing.T) {
mockAllocator := newMockAllocator()
memoryKV := memkv.NewMemoryKV()
fkv := &saveFailKV{TxnKV: memoryKV}
meta, err := newMeta(memoryKV)
meta, err := newMeta(context.TODO(), memoryKV)
assert.Nil(t, err)
......@@ -530,7 +531,7 @@ func TestTryToSealSegment(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
segmentManager.meta.client = fkv
segmentManager.meta.catalog = &datacoord.Catalog{Txn: fkv}
ts, err := segmentManager.allocator.allocTimestamp(context.Background())
assert.Nil(t, err)
......
......@@ -481,7 +481,7 @@ func (s *Server) initMeta() error {
s.kvClient = etcdKV
reloadEtcdFn := func() error {
var err error
s.meta, err = newMeta(s.kvClient)
s.meta, err = newMeta(s.ctx, s.kvClient)
if err != nil {
return err
}
......
......@@ -1174,7 +1174,7 @@ func TestSaveBinlogPaths(t *testing.T) {
svr.meta.AddCollection(&datapb.CollectionInfo{ID: 1})
err := svr.meta.AddSegment(&SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
Segment: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
InsertChannel: "ch1",
......@@ -1827,8 +1827,7 @@ func TestShouldDropChannel(t *testing.T) {
})
t.Run("channel in remove flag", func(t *testing.T) {
key := buildChannelRemovePath("ch1")
err := svr.meta.client.Save(key, removeFlagTomestone)
err := svr.meta.catalog.MarkChannelDeleted(context.TODO(), "ch1")
require.NoError(t, err)
assert.True(t, svr.handler.CheckShouldDropChannel("ch1"))
......@@ -2627,7 +2626,7 @@ func TestDataCoordServer_SetSegmentState(t *testing.T) {
svr.meta.Lock()
func() {
defer svr.meta.Unlock()
svr.meta, _ = newMeta(&mockTxnKVext{})
svr.meta, _ = newMeta(context.TODO(), &mockTxnKVext{})
}()
defer closeTestServer(t, svr)
segment := &datapb.SegmentInfo{
......
......@@ -3,13 +3,13 @@ package metastore
import (
"context"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type Catalog interface {
type RootCoordCatalog interface {
CreateCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error
GetCollectionByID(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*model.Collection, error)
GetCollectionByName(ctx context.Context, collectionName string, ts typeutil.Timestamp) (*model.Collection, error)
......@@ -56,3 +56,16 @@ const (
DELETE
MODIFY
)
type DataCoordCatalog interface {
ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error)
AddSegment(ctx context.Context, segment *datapb.SegmentInfo) error
AlterSegments(ctx context.Context, segments []*datapb.SegmentInfo) error
// AlterSegmentsAndAddNewSegment for transaction
AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error
SaveDroppedSegmentsInBatch(ctx context.Context, modSegments map[int64]*datapb.SegmentInfo) ([]int64, error)
DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error
MarkChannelDeleted(ctx context.Context, channel string) error
IsChannelDropped(ctx context.Context, channel string) bool
DropChannel(ctx context.Context, channel string) error
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
const (
MetaPrefix = "datacoord-meta"
SegmentPrefix = MetaPrefix + "/s"
SegmentBinlogPathPrefix = MetaPrefix + "/binlog"
SegmentDeltalogPathPrefix = MetaPrefix + "/deltalog"
SegmentStatslogPathPrefix = MetaPrefix + "/statslog"
ChannelRemovePrefix = MetaPrefix + "/channel-removal"
RemoveFlagTomestone = "removed"
MaxOperationsPerTxn = 64
MaxBytesPerTxn = 1024 * 1024
)
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
type Catalog struct {
Txn kv.TxnKV
}
func (kc *Catalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error) {
_, values, err := kc.Txn.LoadWithPrefix(SegmentPrefix)
if err != nil {
return nil, err
}
// get segment info
var segments []*datapb.SegmentInfo
for _, value := range values {
segmentInfo := &datapb.SegmentInfo{}
err = proto.Unmarshal([]byte(value), segmentInfo)
if err != nil {
log.Error("unmarshal segment info error", zap.Int64("segmentID", segmentInfo.ID), zap.Int64("collID", segmentInfo.CollectionID), zap.Error(err))
return nil, err
}
segments = append(segments, segmentInfo)
}
// get binlogs from segment meta first for compatible
var binlogs, deltalogs, statslogs []*datapb.FieldBinlog
for _, segmentInfo := range segments {
if len(segmentInfo.Binlogs) == 0 {
binlogs, err = kc.unmarshalBinlog(storage.InsertBinlog, segmentInfo.CollectionID, segmentInfo.PartitionID, segmentInfo.ID)
if err != nil {
return nil, err
}
}
if len(segmentInfo.Deltalogs) == 0 {
deltalogs, err = kc.unmarshalBinlog(storage.DeleteBinlog, segmentInfo.CollectionID, segmentInfo.PartitionID, segmentInfo.CollectionID)
if err != nil {
return nil, err
}
}
if len(segmentInfo.Statslogs) == 0 {
statslogs, err = kc.unmarshalBinlog(storage.StatsBinlog, segmentInfo.CollectionID, segmentInfo.PartitionID, segmentInfo.PartitionID)
if err != nil {
return nil, err
}
}
segmentInfo.Binlogs = binlogs
segmentInfo.Deltalogs = deltalogs
segmentInfo.Statslogs = statslogs
}
return segments, nil
}
func (kc *Catalog) AddSegment(ctx context.Context, segment *datapb.SegmentInfo) error {
// save binlogs separately
kvs, err := buildBinlogKvPair(segment)
if err != nil {
return err
}
// save segment info
k, v, err := buildSegmentKeyValuePair(segment)
if err != nil {
return err
}
kvs[k] = v
// save handoff req if segment is flushed
if segment.State == commonpb.SegmentState_Flushed {
handoffSegmentInfo := &querypb.SegmentInfo{
SegmentID: segment.ID,
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
DmChannel: segment.InsertChannel,
SegmentState: commonpb.SegmentState_Sealed,
CreatedByCompaction: segment.CreatedByCompaction,
CompactionFrom: segment.CompactionFrom,
}
handoffSegBytes, err := proto.Marshal(handoffSegmentInfo)
if err != nil {
return fmt.Errorf("add segmentID:%d, marshal handoff segment info failed:%w", segment.GetID(), err)
}
queryKey := buildQuerySegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
kvs[queryKey] = string(handoffSegBytes)
}
return kc.Txn.MultiSave(kvs)
}
func (kc *Catalog) AlterSegments(ctx context.Context, modSegments []*datapb.SegmentInfo) error {
kv := make(map[string]string)
for _, segment := range modSegments {
// save binlogs separately
binlogKvs, err := buildBinlogKvPair(segment)
if err != nil {
return err
}
kv = typeutil.MergeMap(binlogKvs, kv)
// save segment info
k, v, err := buildSegmentKeyValuePair(segment)
if err != nil {
return err
}
kv[k] = v
// save handoff req if segment is flushed
if segment.State == commonpb.SegmentState_Flushed {
handoffSegmentInfo := &querypb.SegmentInfo{
SegmentID: segment.ID,
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
DmChannel: segment.InsertChannel,
SegmentState: commonpb.SegmentState_Sealed,
CreatedByCompaction: segment.CreatedByCompaction,
CompactionFrom: segment.CompactionFrom,
}
handoffSegBytes, err := proto.Marshal(handoffSegmentInfo)
if err != nil {
return fmt.Errorf("failed to marshal segment: %d, error: %w", segment.GetID(), err)
}
queryKey := buildQuerySegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
kv[queryKey] = string(handoffSegBytes)
}
}
return kc.Txn.MultiSave(kv)
}
func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error {
data := make(map[string]string)
for _, s := range segments {
k, v, err := buildSegmentKeyValuePair(s)
if err != nil {
return err
}
data[k] = v
}
if newSegment.NumOfRows > 0 {
// save binlogs separately
binlogKvs, err := buildBinlogKvPair(newSegment)
if err != nil {
return err
}
data = typeutil.MergeMap(binlogKvs, data)
// save segment info
k, v, err := buildSegmentKeyValuePair(newSegment)
if err != nil {
return err
}
data[k] = v
}
err := kc.Txn.MultiSave(data)
if err != nil {
log.Error("batch save segments failed", zap.Error(err))
return err
}
return nil
}
func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, modSegments map[int64]*datapb.SegmentInfo) ([]int64, error) {
kvs := make(map[string]string)
batchIDs := make([]int64, 0, MaxOperationsPerTxn)
size := 0
for id, s := range modSegments {
key := buildSegmentPath(s.GetCollectionID(), s.GetPartitionID(), s.GetID())
segBytes, err := proto.Marshal(s)
if err != nil {
return nil, fmt.Errorf("failed to marshal segment: %d, err: %w", s.GetID(), err)
}
kvs[key] = string(segBytes)
batchIDs = append(batchIDs, s.ID)
size += len(key) + len(segBytes)
// remove record from map `modSegments`
delete(modSegments, id)
// batch stops when one of conditions matched:
// 1. number of records is equals MaxOperationsPerTxn
// 2. left number of modSegments is equals 1
// 3. bytes size is greater than MaxBytesPerTxn
if len(kvs) == MaxOperationsPerTxn || len(modSegments) == 1 || size >= MaxBytesPerTxn {
break
}
}
err := kc.Txn.MultiSave(kvs)
if err != nil {
log.Error("Failed to save segments in batch for DropChannel", zap.Any("segmentIDs", batchIDs), zap.Error(err))
return nil, err
}
return batchIDs, nil
}
func (kc *Catalog) DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error {
segKey := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
keys := []string{segKey}
binlogKvs, err := buildBinlogKvPair(segment)
if err != nil {
return err
}
binlogKeys := typeutil.GetMapKeys(binlogKvs)
keys = append(keys, binlogKeys...)
return kc.Txn.MultiRemove(keys)
}
func (kc *Catalog) MarkChannelDeleted(ctx context.Context, channel string) error {
key := buildChannelRemovePath(channel)
err := kc.Txn.Save(key, RemoveFlagTomestone)
if err != nil {
log.Error("Failed to mark channel dropped", zap.String("channel", channel), zap.Error(err))
return err
}
return nil
}
func (kc *Catalog) IsChannelDropped(ctx context.Context, channel string) bool {
key := buildChannelRemovePath(channel)
v, err := kc.Txn.Load(key)
if err != nil || v != RemoveFlagTomestone {
return false
}
return true
}
// DropChannel removes channel remove flag after whole procedure is finished
func (kc *Catalog) DropChannel(ctx context.Context, channel string) error {
key := buildChannelRemovePath(channel)
return kc.Txn.Remove(key)
}
func (kc *Catalog) unmarshalBinlog(binlogType storage.BinlogType, collectionID, partitionID, segmentID typeutil.UniqueID) ([]*datapb.FieldBinlog, error) {
// unmarshal binlog/deltalog/statslog
var binlogPrefix string
switch binlogType {
case storage.InsertBinlog:
binlogPrefix = buildFieldBinlogPathPrefix(collectionID, partitionID, segmentID)
case storage.DeleteBinlog:
binlogPrefix = buildFieldDeltalogPathPrefix(collectionID, partitionID, segmentID)
case storage.StatsBinlog:
binlogPrefix = buildFieldStatslogPathPrefix(collectionID, partitionID, segmentID)
default:
return nil, fmt.Errorf("invalid binlog type: %d", binlogType)
}
_, values, err := kc.Txn.LoadWithPrefix(binlogPrefix)
if err != nil {
return nil, err
}
result := make([]*datapb.FieldBinlog, len(values))
for i, value := range values {
fieldBinlog := &datapb.FieldBinlog{}
err = proto.Unmarshal([]byte(value), fieldBinlog)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal datapb.FieldBinlog: %d, err:%w", fieldBinlog.FieldID, err)
}
result[i] = fieldBinlog
}
return result, nil
}
func buildBinlogKvPair(segment *datapb.SegmentInfo) (map[string]string, error) {
kv := make(map[string]string)
// binlog kv
for _, binlog := range segment.Binlogs {
binlogBytes, err := proto.Marshal(binlog)
if err != nil {
return nil, fmt.Errorf("marshal binlogs failed, collectionID:%d, segmentID:%d, fieldID:%d, error:%w", segment.CollectionID, segment.GetID(), binlog.FieldID, err)
}
key := buildFieldBinlogPath(segment.CollectionID, segment.PartitionID, segment.ID, binlog.FieldID)
kv[key] = string(binlogBytes)
}
// deltalog etcd kv
for _, deltalog := range segment.Deltalogs {
binlogBytes, err := proto.Marshal(deltalog)
if err != nil {
return nil, fmt.Errorf("marshal deltalogs failed, collectionID:%d, segmentID:%d, fieldID:%d, error:%w", segment.CollectionID, segment.GetID(), deltalog.FieldID, err)
}
key := buildFieldDeltalogPath(segment.CollectionID, segment.PartitionID, segment.ID, deltalog.FieldID)
kv[key] = string(binlogBytes)
}
// statslog etcd kv
for _, statslog := range segment.Statslogs {
binlogBytes, err := proto.Marshal(statslog)
if err != nil {
return nil, fmt.Errorf("marshal statslogs failed, collectionID:%d, segmentID:%d, fieldID:%d, error:%w", segment.CollectionID, segment.GetID(), statslog.FieldID, err)
}
key := buildFieldStatslogPath(segment.CollectionID, segment.PartitionID, segment.ID, statslog.FieldID)
kv[key] = string(binlogBytes)
}
return kv, nil
}
func buildSegmentKeyValuePair(segment *datapb.SegmentInfo) (string, string, error) {
clonedSegment := proto.Clone(segment).(*datapb.SegmentInfo)
clonedSegment.Binlogs = nil
clonedSegment.Deltalogs = nil
clonedSegment.Statslogs = nil
segBytes, err := proto.Marshal(clonedSegment)
if err != nil {
return "", "", fmt.Errorf("failed to marshal segment: %d, err: %w", segment.ID, err)
}
key := buildSegmentPath(clonedSegment.GetCollectionID(), clonedSegment.GetPartitionID(), clonedSegment.GetID())
return key, string(segBytes), nil
}
// buildSegmentPath common logic mapping segment info to corresponding key in kv store
func buildSegmentPath(collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, segmentID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d", SegmentPrefix, collectionID, partitionID, segmentID)
}
func buildFieldBinlogPath(collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, segmentID typeutil.UniqueID, fieldID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d/%d", SegmentBinlogPathPrefix, collectionID, partitionID, segmentID, fieldID)
}
func buildFieldDeltalogPath(collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, segmentID typeutil.UniqueID, fieldID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d/%d", SegmentDeltalogPathPrefix, collectionID, partitionID, segmentID, fieldID)
}
func buildFieldStatslogPath(collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, segmentID typeutil.UniqueID, fieldID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d/%d", SegmentStatslogPathPrefix, collectionID, partitionID, segmentID, fieldID)
}
// buildQuerySegmentPath common logic mapping segment info to corresponding key of queryCoord in kv store
func buildQuerySegmentPath(collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, segmentID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d", util.HandoffSegmentPrefix, collectionID, partitionID, segmentID)
}
func buildFieldBinlogPathPrefix(collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, segmentID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d", SegmentBinlogPathPrefix, collectionID, partitionID, segmentID)
}
func buildFieldDeltalogPathPrefix(collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, segmentID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d", SegmentDeltalogPathPrefix, collectionID, partitionID, segmentID)
}
func buildFieldStatslogPathPrefix(collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, segmentID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d", SegmentStatslogPathPrefix, collectionID, partitionID, segmentID)
}
// buildChannelRemovePath builds vchannel remove flag path
func buildChannelRemovePath(channel string) string {
return fmt.Sprintf("%s/%s", ChannelRemovePrefix, channel)
}
package datacoord
import (
"context"
"errors"
"testing"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
)
type MockedTxnKV struct {
kv.TxnKV
multiSave func(kvs map[string]string) error
save func(key, value string) error
}
func (mc *MockedTxnKV) MultiSave(kvs map[string]string) error {
return mc.multiSave(kvs)
}
func (mc *MockedTxnKV) Save(key, value string) error {
return mc.save(key, value)
}
var (
segments = []*datapb.SegmentInfo{
{
ID: 1,
CollectionID: 1000,
},
}
newSegment = &datapb.SegmentInfo{
ID: 2,
CollectionID: 1000,
}
)
func Test_AlterSegmentsAndAddNewSegment_SaveError(t *testing.T) {
txn := &MockedTxnKV{}
txn.multiSave = func(kvs map[string]string) error {
return errors.New("error")
}
catalog := &Catalog{txn}
err := catalog.AlterSegmentsAndAddNewSegment(context.TODO(), segments, newSegment)
assert.Error(t, err)
}
func Test_SaveDroppedSegmentsInBatch_SaveError(t *testing.T) {
txn := &MockedTxnKV{}
txn.multiSave = func(kvs map[string]string) error {
return errors.New("error")
}
catalog := &Catalog{txn}
segments := map[int64]*datapb.SegmentInfo{
1: {
ID: 1,
CollectionID: 1000,
},
}
ids, err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments)
assert.Nil(t, ids)
assert.Error(t, err)
}
func Test_MarkChannelDeleted_SaveError(t *testing.T) {
txn := &MockedTxnKV{}
txn.save = func(key, value string) error {
return errors.New("error")
}
catalog := &Catalog{txn}
err := catalog.MarkChannelDeleted(context.TODO(), "test_channel_1")
assert.Error(t, err)
}
......@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
package rootcoord
import (
"context"
......
......@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
package rootcoord
import (
"context"
......
package kv
package rootcoord
const (
// ComponentPrefix prefix for rootcoord component
......
......@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
package rootcoord
import (
"bytes"
......
......@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
package rootcoord
import (
"fmt"
......
......@@ -254,7 +254,6 @@ message SegmentInfo {
repeated FieldBinlog statslogs = 12;
// deltalogs consists of delete binlogs. FieldID is not used yet since delete is always applied on primary key
repeated FieldBinlog deltalogs = 13;
bool createdByCompaction = 14;
repeated int64 compactionFrom = 15;
uint64 dropped_at = 16; // timestamp when segment marked drop
......@@ -347,7 +346,7 @@ message Binlog {
uint64 timestamp_from = 2;
uint64 timestamp_to = 3;
string log_path = 4;
int64 log_size = 5;
int64 log_size = 5;
}
message GetRecoveryInfoResponse {
......
......@@ -24,7 +24,7 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metastore"
kvmetestore "github.com/milvus-io/milvus/internal/metastore/kv"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
......@@ -38,13 +38,13 @@ import (
const (
// TimestampPrefix prefix for timestamp
TimestampPrefix = kvmetestore.ComponentPrefix + "/timestamp"
TimestampPrefix = rootcoord.ComponentPrefix + "/timestamp"
// DDOperationPrefix prefix for DD operation
DDOperationPrefix = kvmetestore.ComponentPrefix + "/dd-operation"
DDOperationPrefix = rootcoord.ComponentPrefix + "/dd-operation"
// DDMsgSendPrefix prefix to indicate whether DD msg has been send
DDMsgSendPrefix = kvmetestore.ComponentPrefix + "/dd-msg-send"
DDMsgSendPrefix = rootcoord.ComponentPrefix + "/dd-msg-send"
// CreateCollectionDDType name of DD type for create collection
CreateCollectionDDType = "CreateCollection"
......@@ -68,7 +68,7 @@ const (
// MetaTable store all rootCoord meta info
type MetaTable struct {
ctx context.Context
catalog metastore.Catalog
catalog metastore.RootCoordCatalog
collID2Meta map[typeutil.UniqueID]model.Collection // collection id -> collection meta
collName2ID map[string]typeutil.UniqueID // collection name to collection id
......@@ -82,7 +82,7 @@ type MetaTable struct {
// NewMetaTable creates meta table for rootcoord, which stores all in-memory information
// for collection, partition, segment, index etc.
func NewMetaTable(ctx context.Context, catalog metastore.Catalog) (*MetaTable, error) {
func NewMetaTable(ctx context.Context, catalog metastore.RootCoordCatalog) (*MetaTable, error) {
mt := &MetaTable{
ctx: contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName),
catalog: catalog,
......
......@@ -34,7 +34,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/metastore"
kvmetestore "github.com/milvus-io/milvus/internal/metastore/kv"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/commonpb"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
......@@ -126,12 +126,12 @@ func generateMetaTable(t *testing.T) (*MetaTable, *mockTestKV, *mockTestTxnKV, f
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
require.Nil(t, err)
skv, err := kvmetestore.NewMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7)
skv, err := rootcoord.NewMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7)
assert.Nil(t, err)
assert.NotNil(t, skv)
txnkv := etcdkv.NewEtcdKV(etcdCli, rootPath)
_, err = NewMetaTable(context.TODO(), &kvmetestore.Catalog{Txn: txnkv, Snapshot: skv})
_, err = NewMetaTable(context.TODO(), &rootcoord.Catalog{Txn: txnkv, Snapshot: skv})
assert.Nil(t, err)
mockSnapshotKV := &mockTestKV{
SnapShotKV: skv,
......@@ -150,7 +150,7 @@ func generateMetaTable(t *testing.T) (*MetaTable, *mockTestKV, *mockTestTxnKV, f
remove: func(key string) error { return txnkv.Remove(key) },
}
mockMt, err := NewMetaTable(context.TODO(), &kvmetestore.Catalog{Txn: mockTxnKV, Snapshot: mockSnapshotKV})
mockMt, err := NewMetaTable(context.TODO(), &rootcoord.Catalog{Txn: mockTxnKV, Snapshot: mockSnapshotKV})
assert.Nil(t, err)
return mockMt, mockSnapshotKV, mockTxnKV, func() {
etcdCli.Close()
......@@ -194,11 +194,11 @@ func TestMetaTable(t *testing.T) {
require.Nil(t, err)
defer etcdCli.Close()
skv, err := kvmetestore.NewMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7)
skv, err := rootcoord.NewMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7)
assert.Nil(t, err)
assert.NotNil(t, skv)
txnKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
mt, err := NewMetaTable(context.TODO(), &kvmetestore.Catalog{Txn: txnKV, Snapshot: skv})
mt, err := NewMetaTable(context.TODO(), &rootcoord.Catalog{Txn: txnKV, Snapshot: skv})
assert.Nil(t, err)
collInfo := &model.Collection{
......@@ -554,7 +554,7 @@ func TestMetaTable(t *testing.T) {
remove: func(key string) error { return txnkv.Remove(key) },
}
mt, err = NewMetaTable(context.TODO(), &kvmetestore.Catalog{Txn: mockTxnKV, Snapshot: mockKV})
mt, err = NewMetaTable(context.TODO(), &rootcoord.Catalog{Txn: mockTxnKV, Snapshot: mockKV})
assert.Nil(t, err)
wg.Add(1)
......@@ -1206,9 +1206,9 @@ func TestRbacSelectRole(t *testing.T) {
assert.Equal(t, 2, len(results[0].Users))
mockTxnKV.loadWithPrefix = func(key string) ([]string, []string, error) {
if key == kvmetestore.RoleMappingPrefix {
if key == rootcoord.RoleMappingPrefix {
return []string{key + "/user1/role2", key + "/user2/role2", key + "/user1/role1", key + "/user2/role1"}, []string{"value1", "value2", "values3", "value4"}, nil
} else if key == kvmetestore.RolePrefix {
} else if key == rootcoord.RolePrefix {
return []string{key + "/role1", key + "/role2", key + "/role3"}, []string{"value1", "value2", "values3"}, nil
} else {
return []string{}, []string{}, fmt.Errorf("load with prefix error")
......@@ -1266,14 +1266,14 @@ func TestRbacSelectUser(t *testing.T) {
mockTxnKV.loadWithPrefix = func(key string) ([]string, []string, error) {
logger.Debug("simfg", zap.String("key", key))
if strings.Contains(key, kvmetestore.RoleMappingPrefix) {
if strings.Contains(key, rootcoord.RoleMappingPrefix) {
if strings.Contains(key, "user1") {
return []string{key + "/role2", key + "/role1", key + "/role3"}, []string{"value1", "value4", "value2"}, nil
} else if strings.Contains(key, "user2") {
return []string{key + "/role2"}, []string{"value1"}, nil
}
return []string{}, []string{}, nil
} else if key == kvmetestore.CredentialPrefix {
} else if key == rootcoord.CredentialPrefix {
return []string{key + "/user1", key + "/user2", key + "/user3"}, []string{string(credentialInfoByte), string(credentialInfoByte), string(credentialInfoByte)}, nil
} else {
return []string{}, []string{}, fmt.Errorf("load with prefix error")
......@@ -1562,11 +1562,11 @@ func TestMetaWithTimestamp(t *testing.T) {
assert.Nil(t, err)
defer etcdCli.Close()
skv, err := kvmetestore.NewMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7)
skv, err := rootcoord.NewMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7)
assert.Nil(t, err)
assert.NotNil(t, skv)
txnKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
mt, err := NewMetaTable(context.TODO(), &kvmetestore.Catalog{Txn: txnKV, Snapshot: skv})
mt, err := NewMetaTable(context.TODO(), &rootcoord.Catalog{Txn: txnKV, Snapshot: skv})
assert.Nil(t, err)
collInfo := &model.Collection{
......@@ -1726,16 +1726,16 @@ func TestFixIssue10540(t *testing.T) {
assert.Nil(t, err)
defer etcdCli.Close()
skv, err := kvmetestore.NewMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7)
skv, err := rootcoord.NewMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7)
assert.Nil(t, err)
assert.NotNil(t, skv)
//txnKV := etcdkv.NewEtcdKVWithClient(etcdCli, rootPath)
txnKV := memkv.NewMemoryKV()
// compose rc7 legace tombstone cases
txnKV.Save(path.Join(kvmetestore.SegmentIndexMetaPrefix, "2"), string(kvmetestore.SuffixSnapshotTombstone))
txnKV.Save(path.Join(kvmetestore.IndexMetaPrefix, "3"), string(kvmetestore.SuffixSnapshotTombstone))
txnKV.Save(path.Join(rootcoord.SegmentIndexMetaPrefix, "2"), string(rootcoord.SuffixSnapshotTombstone))
txnKV.Save(path.Join(rootcoord.IndexMetaPrefix, "3"), string(rootcoord.SuffixSnapshotTombstone))
_, err = NewMetaTable(context.TODO(), &kvmetestore.Catalog{Txn: txnKV, Snapshot: skv})
_, err = NewMetaTable(context.TODO(), &rootcoord.Catalog{Txn: txnKV, Snapshot: skv})
assert.Nil(t, err)
}
......@@ -2075,7 +2075,7 @@ func TestMetaTable_AlignSegmentsMeta(t *testing.T) {
indexID = UniqueID(1000)
)
mt := &MetaTable{
catalog: &kvmetestore.Catalog{
catalog: &rootcoord.Catalog{
Txn: &mockTestTxnKV{
multiRemove: func(keys []string) error {
return nil
......@@ -2117,7 +2117,7 @@ func TestMetaTable_AlignSegmentsMeta(t *testing.T) {
return fmt.Errorf("error occurred")
},
}
mt.catalog = &kvmetestore.Catalog{Txn: txn}
mt.catalog = &rootcoord.Catalog{Txn: txn}
mt.AlignSegmentsMeta(collID, partID, map[UniqueID]struct{}{103: {}, 104: {}, 105: {}})
})
}
......@@ -2237,7 +2237,7 @@ func TestMetaTable_MarkIndexDeleted(t *testing.T) {
type MockedCatalog struct {
mock.Mock
metastore.Catalog
metastore.RootCoordCatalog
alterIndexParamsVerification func(ctx context.Context, oldIndex *model.Index, newIndex *model.Index, alterType metastore.AlterType)
createIndexParamsVerification func(ctx context.Context, col *model.Collection, index *model.Index)
dropIndexParamsVerification func(ctx context.Context, collectionInfo *model.Collection, dropIdxID typeutil.UniqueID)
......
......@@ -23,10 +23,10 @@ import (
"path"
"sync"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metastore/kv"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
......@@ -77,7 +77,7 @@ func (p *proxyManager) DelSessionFunc(fns ...func(*sessionutil.Session)) {
// WatchProxy starts a goroutine to watch proxy session changes on etcd
func (p *proxyManager) WatchProxy() error {
ctx, cancel := context.WithTimeout(p.ctx, kv.RequestTimeout)
ctx, cancel := context.WithTimeout(p.ctx, rootcoord.RequestTimeout)
defer cancel()
sessions, rev, err := p.getSessionsOnEtcd(ctx)
......@@ -210,7 +210,7 @@ func (p *proxyManager) Stop() {
// listProxyInEtcd helper function lists proxy in etcd
func listProxyInEtcd(ctx context.Context, cli *clientv3.Client) (map[int64]*sessionutil.Session, error) {
ctx2, cancel := context.WithTimeout(ctx, kv.RequestTimeout)
ctx2, cancel := context.WithTimeout(ctx, rootcoord.RequestTimeout)
defer cancel()
resp, err := cli.Get(
ctx2,
......
......@@ -34,10 +34,10 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/db"
"github.com/milvus-io/milvus/internal/metastore/db/dao"
"github.com/milvus-io/milvus/internal/metastore/db/dbcore"
kvmetestore "github.com/milvus-io/milvus/internal/metastore/kv"
rootcoord2 "github.com/milvus-io/milvus/internal/metastore/db/rootcoord"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/metrics"
ms "github.com/milvus-io/milvus/internal/mq/msgstream"
......@@ -1157,7 +1157,7 @@ func (c *Core) Init() error {
return initError
}
var catalog metastore.Catalog
var catalog metastore.RootCoordCatalog
switch Params.MetaStoreCfg.MetaStoreType {
case util.MetaStoreTypeEtcd:
var metaKV kv.TxnKV
......@@ -1167,13 +1167,13 @@ func (c *Core) Init() error {
return initError
}
var ss *kvmetestore.SuffixSnapshot
if ss, initError = kvmetestore.NewSuffixSnapshot(metaKV, "_ts", Params.EtcdCfg.MetaRootPath, "snapshots"); initError != nil {
var ss *rootcoord.SuffixSnapshot
if ss, initError = rootcoord.NewSuffixSnapshot(metaKV, "_ts", Params.EtcdCfg.MetaRootPath, "snapshots"); initError != nil {
log.Error("RootCoord failed to new suffixSnapshot", zap.Error(initError))
return initError
}
catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss}
catalog = &rootcoord.Catalog{Txn: metaKV, Snapshot: ss}
case util.MetaStoreTypeMysql:
// connect to database
err := dbcore.Connect(&Params.DBCfg)
......@@ -1181,7 +1181,7 @@ func (c *Core) Init() error {
return err
}
catalog = db.NewTableCatalog(dbcore.NewTxImpl(), dao.NewMetaDomain())
catalog = rootcoord2.NewTableCatalog(dbcore.NewTxImpl(), dao.NewMetaDomain())
default:
return fmt.Errorf("not supported meta store: %s", Params.MetaStoreCfg.MetaStoreType)
}
......
......@@ -33,7 +33,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/log"
kvmetestore "github.com/milvus-io/milvus/internal/metastore/kv"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
......@@ -783,11 +783,11 @@ func TestRootCoordInitData(t *testing.T) {
err = core.MetaTable.DeleteCredential(util.UserRoot)
assert.NoError(t, err)
snapshotKV, err := kvmetestore.NewMetaSnapshot(etcdCli, Params.EtcdCfg.MetaRootPath, TimestampPrefix, 7)
snapshotKV, err := rootcoord.NewMetaSnapshot(etcdCli, Params.EtcdCfg.MetaRootPath, TimestampPrefix, 7)
assert.NotNil(t, snapshotKV)
assert.NoError(t, err)
txnKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
mt, err := NewMetaTable(context.TODO(), &kvmetestore.Catalog{Txn: txnKV, Snapshot: snapshotKV})
mt, err := NewMetaTable(context.TODO(), &rootcoord.Catalog{Txn: txnKV, Snapshot: snapshotKV})
assert.NoError(t, err)
mockTxnKV := &mockTestTxnKV{
TxnKV: txnKV,
......@@ -798,7 +798,7 @@ func TestRootCoordInitData(t *testing.T) {
load: func(key string) (string, error) { return txnKV.Load(key) },
}
//mt.txn = mockTxnKV
mt.catalog = &kvmetestore.Catalog{Txn: mockTxnKV, Snapshot: snapshotKV}
mt.catalog = &rootcoord.Catalog{Txn: mockTxnKV, Snapshot: snapshotKV}
core.MetaTable = mt
err = core.initData()
assert.Error(t, err)
......@@ -2255,7 +2255,7 @@ func TestRootCoord_Base(t *testing.T) {
p2 := sessionutil.Session{
ServerID: 101,
}
ctx2, cancel2 := context.WithTimeout(ctx, kvmetestore.RequestTimeout)
ctx2, cancel2 := context.WithTimeout(ctx, rootcoord.RequestTimeout)
defer cancel2()
s1, err := json.Marshal(&p1)
assert.NoError(t, err)
......
......@@ -37,6 +37,8 @@ const (
DDLBinlog
// IndexFileBinlog BinlogType for index
IndexFileBinlog
// StatsBinlog BinlogType for stats data
StatsBinlog
)
const (
// MagicNumber used in binlog
......
package typeutil
// MergeMap merge one map to another
func MergeMap(src map[string]string, dst map[string]string) map[string]string {
for k, v := range src {
dst[k] = v
}
return dst
}
// GetMapKeys return keys of a map
func GetMapKeys(src map[string]string) []string {
keys := make([]string, 0, len(src))
for k := range src {
keys = append(keys, k)
}
return keys
}
package typeutil
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMergeMap(t *testing.T) {
src := make(map[string]string)
src["Alice"] = "female"
src["Bob"] = "male"
dst := make(map[string]string)
dst = MergeMap(src, dst)
assert.EqualValues(t, dst, src)
src = nil
dst = nil
dst = MergeMap(src, dst)
assert.Nil(t, dst)
}
func TestGetMapKeys(t *testing.T) {
src := make(map[string]string)
src["Alice"] = "female"
src["Bob"] = "male"
keys := GetMapKeys(src)
assert.Equal(t, 2, len(keys))
assert.Contains(t, keys, "Alice", "Bob")
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册