未验证 提交 840e82ac 编写于 作者: J jaime 提交者: GitHub

Improve LoadWithPrefix performance for SuffixSnapshot (#21601)

Signed-off-by: Njaime <yun.zhang@zilliz.com>
上级 187aff68
......@@ -27,18 +27,19 @@ import (
"strings"
"sync"
"github.com/milvus-io/milvus/internal/common"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
var (
// SuffixSnapshotTombstone special value for tombstone mark
SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
PaginationSize = 2000
)
// IsTombstone used in migration tool also.
......@@ -56,7 +57,7 @@ func ConstructTombstone() []byte {
// SuffixSnapshot record timestamp as prefix of a key under the Snapshot prefix path
type SuffixSnapshot struct {
// internal kv which SuffixSnapshot based on
kv.TxnKV
kv.MetaKv
// rw mutex provided range lock
sync.RWMutex
// lastestTS latest timestamp for each key
......@@ -91,9 +92,9 @@ type tsv struct {
var _ kv.SnapShotKV = (*SuffixSnapshot)(nil)
// NewSuffixSnapshot creates a NewSuffixSnapshot with provided kv
func NewSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*SuffixSnapshot, error) {
if txnKV == nil {
return nil, retry.Unrecoverable(errors.New("txnKV is nil"))
func NewSuffixSnapshot(metaKV kv.MetaKv, sep, root, snapshot string) (*SuffixSnapshot, error) {
if metaKV == nil {
return nil, retry.Unrecoverable(errors.New("MetaKv is nil"))
}
// handles trailing / logic
......@@ -104,8 +105,8 @@ func NewSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*SuffixSnaps
tk = path.Join(root, "k")
rootLen := len(tk) - 1
return &SuffixSnapshot{
TxnKV: txnKV,
ss := &SuffixSnapshot{
MetaKv: metaKV,
lastestTS: make(map[string]typeutil.Timestamp),
separator: sep,
exp: regexp.MustCompile(fmt.Sprintf(`^(.+)%s(\d+)$`, sep)),
......@@ -113,7 +114,8 @@ func NewSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*SuffixSnaps
snapshotLen: snapshotLen,
rootPrefix: root,
rootLen: rootLen,
}, nil
}
return ss, nil
}
// isTombstone helper function to check whether is tombstone mark
......@@ -200,9 +202,9 @@ func (ss *SuffixSnapshot) checkKeyTS(key string, ts typeutil.Timestamp) (bool, e
// loadLatestTS load the loatest ts for specified key
func (ss *SuffixSnapshot) loadLatestTS(key string) error {
prefix := ss.composeSnapshotPrefix(key)
keys, _, err := ss.TxnKV.LoadWithPrefix(prefix)
keys, _, err := ss.MetaKv.LoadWithPrefix(prefix)
if err != nil {
log.Warn("SuffixSnapshot txnkv LoadWithPrefix failed", zap.String("key", key),
log.Warn("SuffixSnapshot MetaKv LoadWithPrefix failed", zap.String("key", key),
zap.Error(err))
return err
}
......@@ -259,14 +261,14 @@ func binarySearchRecords(records []tsv, ts typeutil.Timestamp) (string, bool) {
}
// Save stores key-value pairs with timestamp
// if ts is 0, SuffixSnapshot works as a TxnKV
// if ts is 0, SuffixSnapshot works as a MetaKv
// otherwise, SuffixSnapshot will store a ts-key as "key[sep]ts"-value pair in snapshot path
// and for acceleration store original key-value if ts is the latest
func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp) error {
// if ts == 0, act like TxnKv
// if ts == 0, act like MetaKv
// will not update lastestTs since ts not not valid
if ts == 0 {
return ss.TxnKV.Save(key, value)
return ss.MetaKv.Save(key, value)
}
ss.Lock()
......@@ -281,7 +283,7 @@ func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp)
return err
}
if after {
err := ss.TxnKV.MultiSave(map[string]string{
err := ss.MetaKv.MultiSave(map[string]string{
key: value,
tsKey: value,
})
......@@ -293,14 +295,14 @@ func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp)
}
// modifying history key, just save tskey-value
return ss.TxnKV.Save(tsKey, value)
return ss.MetaKv.Save(tsKey, value)
}
func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) {
// if ts == 0, load latest by definition
// and with acceleration logic, just do load key will do
if ts == 0 {
value, err := ss.TxnKV.Load(key)
value, err := ss.MetaKv.Load(key)
if ss.isTombstone(value) {
return "", errors.New("no value found")
}
......@@ -318,7 +320,7 @@ func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error
return "", err
}
if after {
value, err := ss.TxnKV.Load(key)
value, err := ss.MetaKv.Load(key)
if ss.isTombstone(value) {
return "", errors.New("no value found")
}
......@@ -327,9 +329,9 @@ func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error
// before ts, do time travel
// 1. load all tsKey with key/ prefix
keys, values, err := ss.TxnKV.LoadWithPrefix(ss.composeSnapshotPrefix(key))
keys, values, err := ss.MetaKv.LoadWithPrefix(ss.composeSnapshotPrefix(key))
if err != nil {
log.Warn("prefixSnapshot txnKV LoadWithPrefix failed", zap.String("key", key), zap.Error(err))
log.Warn("prefixSnapshot MetaKv LoadWithPrefix failed", zap.String("key", key), zap.Error(err))
return "", err
}
......@@ -367,12 +369,12 @@ func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error
}
// MultiSave save multiple kvs
// if ts == 0, act like TxnKV
// if ts == 0, act like MetaKv
// each key-value will be treated using same logic like Save
func (ss *SuffixSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error {
// if ts == 0, act like TxnKV
// if ts == 0, act like MetaKv
if ts == 0 {
return ss.TxnKV.MultiSave(kvs)
return ss.MetaKv.MultiSave(kvs)
}
ss.Lock()
defer ss.Unlock()
......@@ -385,7 +387,7 @@ func (ss *SuffixSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp
}
// multi save execute map; if succeeds, update ts in the update list
err = ss.TxnKV.MultiSave(execute)
err = ss.MetaKv.MultiSave(execute)
if err == nil {
for _, key := range updateList {
ss.lastestTS[key] = ts
......@@ -424,7 +426,7 @@ func (ss *SuffixSnapshot) generateSaveExecute(kvs map[string]string, ts typeutil
func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
// ts 0 case shall be treated as fetch latest/current value
if ts == 0 {
keys, values, err := ss.TxnKV.LoadWithPrefix(key)
keys, values, err := ss.MetaKv.LoadWithPrefix(key)
fks := keys[:0] //make([]string, 0, len(keys))
fvs := values[:0] //make([]string, 0, len(values))
// hide rootPrefix from return value
......@@ -441,71 +443,65 @@ func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s
ss.Lock()
defer ss.Unlock()
keys, values, err := ss.TxnKV.LoadWithPrefix(key)
if err != nil {
return nil, nil, err
}
resultKeys := make([]string, 0)
resultValues := make([]string, 0)
latestOriginalKey := ""
tValueGroups := make([]tsv, 0)
// kv group stands for
type kvgroup struct {
key, value string
processed bool
tsRecords []tsv
prefix := path.Join(ss.snapshotPrefix, key)
appendResultFn := func(ts typeutil.Timestamp) {
value, ok := binarySearchRecords(tValueGroups, ts)
if !ok || ss.isTombstone(value) {
return
}
resultKeys = append(resultKeys, latestOriginalKey)
resultValues = append(resultValues, value)
}
groups := make([]kvgroup, 0, len(keys))
for i, key := range keys {
group := kvgroup{key: key, value: values[i]}
// load prefix keys contains rootPrefix
sKeys, sValues, err := ss.TxnKV.LoadWithPrefix(ss.composeSnapshotPrefix(ss.hideRootPrefix(key)))
err := ss.MetaKv.WalkWithPrefix(prefix, PaginationSize, func(k []byte, v []byte) error {
sKey := string(k)
sValue := string(v)
snapshotKey := ss.hideRootPrefix(sKey)
curOriginalKey, err := ss.getOriginalKey(snapshotKey)
if err != nil {
return nil, nil, err
}
group.tsRecords = make([]tsv, 0, len(sKeys))
for j, sKey := range sKeys {
ts, ok := ss.isTSOfKey(ss.hideRootPrefix(sKey), ss.hideRootPrefix(key))
if ok {
group.tsRecords = append(group.tsRecords, tsv{ts: ts, value: sValues[j]})
}
return err
}
groups = append(groups, group)
}
resultKeys := make([]string, 0, len(groups))
resultValues := make([]string, 0, len(groups))
// for each group, do ts travel logic if appliable
for _, group := range groups {
if len(group.tsRecords) == 0 {
// not ts maybe, just use k,v
resultKeys = append(resultKeys, group.key)
resultValues = append(resultValues, group.value)
continue
// reset if starting look up a new key group
if latestOriginalKey != "" && latestOriginalKey != curOriginalKey {
appendResultFn(ts)
tValueGroups = make([]tsv, 0)
}
value, ok := binarySearchRecords(group.tsRecords, ts)
if ok {
// tombstone found, skip entry
if ss.isTombstone(value) {
continue
}
resultKeys = append(resultKeys, group.key)
resultValues = append(resultValues, value)
targetTs, ok := ss.isTSKey(snapshotKey)
if !ok {
log.Warn("skip key because it doesn't contain ts", zap.String("key", key))
return nil
}
}
// hide rootPrefix from return value
for i, k := range resultKeys {
resultKeys[i] = ss.hideRootPrefix(k)
tValueGroups = append(tValueGroups, tsv{value: sValue, ts: targetTs})
latestOriginalKey = curOriginalKey
return nil
})
if err != nil {
return nil, nil, err
}
appendResultFn(ts)
return resultKeys, resultValues, nil
}
// MultiSaveAndRemoveWithPrefix save muiltple kvs and remove as well
// if ts == 0, act like TxnKV
// if ts == 0, act like MetaKv
// each key-value will be treated in same logic like Save
func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
// if ts == 0, act like TxnKV
// if ts == 0, act like MetaKv
if ts == 0 {
return ss.TxnKV.MultiSaveAndRemoveWithPrefix(saves, removals)
return ss.MetaKv.MultiSaveAndRemoveWithPrefix(saves, removals)
}
ss.Lock()
defer ss.Unlock()
......@@ -519,9 +515,9 @@ func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string,
// load each removal, change execution to adding tombstones
for _, removal := range removals {
keys, _, err := ss.TxnKV.LoadWithPrefix(removal)
keys, _, err := ss.MetaKv.LoadWithPrefix(removal)
if err != nil {
log.Warn("SuffixSnapshot TxnKV LoadwithPrefix failed", zap.String("key", removal), zap.Error(err))
log.Warn("SuffixSnapshot MetaKv LoadwithPrefix failed", zap.String("key", removal), zap.Error(err))
return err
}
......@@ -535,7 +531,7 @@ func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string,
}
// multi save execute map; if succeeds, update ts in the update list
err = ss.TxnKV.MultiSave(execute)
err = ss.MetaKv.MultiSave(execute)
if err == nil {
for _, key := range updateList {
ss.lastestTS[key] = ts
......@@ -543,3 +539,16 @@ func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string,
}
return err
}
func (ss *SuffixSnapshot) getOriginalKey(snapshotKey string) (string, error) {
if !strings.HasPrefix(snapshotKey, ss.snapshotPrefix) {
return "", fmt.Errorf("get original key failed, invailed snapshot key:%s", snapshotKey)
}
// collect keys that parent node is snapshot node if the corresponding the latest ts is expired.
idx := strings.LastIndex(snapshotKey, ss.separator)
if idx == -1 {
return "", fmt.Errorf("get original key failed, snapshot key:%s", snapshotKey)
}
prefix := snapshotKey[:idx]
return prefix[ss.snapshotLen:], nil
}
......@@ -17,16 +17,20 @@
package rootcoord
import (
"errors"
"fmt"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var (
......@@ -389,9 +393,9 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) {
keys, vals, err = ss.LoadWithPrefix("k", typeutil.Timestamp(300))
assert.Nil(t, err)
assert.Equal(t, len(keys), len(vals))
assert.Equal(t, len(keys), 3)
assert.ElementsMatch(t, keys, []string{"k1", "k2", "kextra"})
assert.ElementsMatch(t, vals, []string{"v1-19", "v2-19", "extra-value"})
assert.Equal(t, len(keys), 2)
assert.ElementsMatch(t, keys, []string{"k1", "k2"})
assert.ElementsMatch(t, vals, []string{"v1-19", "v2-19"})
// clean up
ss.RemoveWithPrefix("")
......@@ -492,3 +496,92 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) {
// cleanup
ss.MultiSaveAndRemoveWithPrefix(map[string]string{}, []string{""}, 0)
}
func TestSuffixSnapshot_LoadWithPrefix(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/loadWithPrefix-test-%d", randVal)
sep := "_ts"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
defer etcdCli.Close()
etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath)
assert.NoError(t, err)
defer etcdkv.Close()
ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.NoError(t, err)
assert.NotNil(t, ss)
defer ss.Close()
t.Run("parse ts fail", func(t *testing.T) {
prefix := fmt.Sprintf("prefix%d", rand.Int())
key := fmt.Sprintf("%s-%s", prefix, "ts_error-ts")
err = etcdkv.Save(ss.composeSnapshotPrefix(key), "")
assert.NoError(t, err)
keys, values, err := ss.LoadWithPrefix(prefix, 100)
assert.NoError(t, err)
assert.Equal(t, 0, len(keys))
assert.Equal(t, 0, len(values))
// clean all data
err = etcdkv.RemoveWithPrefix("")
assert.NoError(t, err)
})
t.Run("test walk kv data fail", func(t *testing.T) {
sep := "_ts"
rootPath := "root/"
kv := mocks.NewMetaKv(t)
kv.EXPECT().
WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("error"))
ss, err := NewSuffixSnapshot(kv, sep, rootPath, snapshotPrefix)
assert.NotNil(t, ss)
assert.NoError(t, err)
keys, values, err := ss.LoadWithPrefix("t", 100)
assert.Error(t, err)
assert.Nil(t, keys)
assert.Nil(t, values)
})
}
func Test_getOriginalKey(t *testing.T) {
sep := "_ts"
rootPath := "root/"
kv := mocks.NewMetaKv(t)
ss, err := NewSuffixSnapshot(kv, sep, rootPath, snapshotPrefix)
assert.NotNil(t, ss)
assert.NoError(t, err)
t.Run("match prefix fail", func(t *testing.T) {
ret, err := ss.getOriginalKey("non-snapshots/k1")
assert.Equal(t, "", ret)
assert.Error(t, err)
})
t.Run("find separator fail", func(t *testing.T) {
ret, err := ss.getOriginalKey("snapshots/k1")
assert.Equal(t, "", ret)
assert.Error(t, err)
})
t.Run("ok", func(t *testing.T) {
ret, err := ss.getOriginalKey("snapshots/prefix-1_ts438497159122780160")
assert.Equal(t, "prefix-1", ret)
assert.NoError(t, err)
})
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册