未验证 提交 675b71cc 编写于 作者: C congqixia 提交者: GitHub

Replace metaSnapshot with suffixSnapshot (#7975)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 9a6f2e61
......@@ -22,6 +22,7 @@ import (
"sync/atomic"
"time"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/golang/protobuf/proto"
......@@ -86,11 +87,13 @@ type Core struct {
ctx context.Context
cancel context.CancelFunc
etcdCli *clientv3.Client
kvBase *etcdkv.EtcdKV
kvBase kv.TxnKV //*etcdkv.EtcdKV
//DDL lock
ddlLock sync.Mutex
kvBaseCreate func(root string) (kv.TxnKV, error)
//setMsgStreams, send time tick into dd channel and time tick channel
SendTimeTick func(t typeutil.Timestamp, reason string) error
......@@ -855,24 +858,34 @@ func (c *Core) Register() error {
func (c *Core) Init() error {
var initError error = nil
if c.kvBaseCreate == nil {
c.kvBaseCreate = func(root string) (kv.TxnKV, error) {
return etcdkv.NewEtcdKV(Params.EtcdEndpoints, root)
}
}
c.initOnce.Do(func() {
connectEtcdFn := func() error {
if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints, DialTimeout: 5 * time.Second}); initError != nil {
log.Error("RootCoord, Failed to new Etcd client", zap.Any("reason", initError))
return initError
}
var ms *metaSnapshot
ms, initError = newMetaSnapshot(c.etcdCli, Params.MetaRootPath, TimestampPrefix, 1024)
if c.kvBase, initError = c.kvBaseCreate(Params.KvRootPath); initError != nil {
log.Error("RootCoord, Failed to new EtcdKV", zap.Any("reason", initError))
return initError
}
var metaKV kv.TxnKV
metaKV, initError = c.kvBaseCreate(Params.MetaRootPath)
if initError != nil {
log.Error("RootCoord, Failed to new MetaSnapshot", zap.Any("reason", initError))
log.Error("RootCoord, Failed to new EtcdKV", zap.Any("reason", initError))
return initError
}
if c.MetaTable, initError = NewMetaTable(ms); initError != nil {
log.Error("RootCoord, Failed to new MetaTable", zap.Any("reason", initError))
var ss *suffixSnapshot
if ss, initError = newSuffixSnapshot(metaKV, "_ts", Params.MetaRootPath, "snapshots"); initError != nil {
log.Error("RootCoord, Failed to new suffixSnapshot", zap.Error(initError))
return initError
}
if c.kvBase, initError = etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.KvRootPath); initError != nil {
log.Error("RootCoord, Failed to new EtcdKV", zap.Any("reason", initError))
if c.MetaTable, initError = NewMetaTable(ss); initError != nil {
log.Error("RootCoord, Failed to new MetaTable", zap.Any("reason", initError))
return initError
}
......
......@@ -24,11 +24,14 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
......@@ -43,6 +46,7 @@ import (
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)
......@@ -395,6 +399,110 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32
return nil
}
// a mock kv that always fail when LoadWithPrefix
type loadPrefixFailKV struct {
kv.TxnKV
}
// LoadWithPrefix override behavior
func (kv *loadPrefixFailKV) LoadWithPrefix(key string) ([]string, []string, error) {
return []string{}, []string{}, retry.NoRetryError(errors.New("mocked fail"))
}
func TestRootCoordInit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
coreFactory := msgstream.NewPmsFactory()
Params.Init()
Params.DmlChannelNum = TestDMLChannelNum
core, err := NewCore(ctx, coreFactory)
require.Nil(t, err)
assert.Nil(t, err)
randVal := rand.Int()
Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath)
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
err = core.Register()
assert.Nil(t, err)
err = core.Init()
assert.Nil(t, err)
// inject kvBaseCreate fail
core, err = NewCore(ctx, coreFactory)
require.Nil(t, err)
assert.Nil(t, err)
randVal = rand.Int()
Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath)
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
err = core.Register()
assert.Nil(t, err)
core.kvBaseCreate = func(string) (kv.TxnKV, error) {
return nil, retry.NoRetryError(errors.New("injected"))
}
err = core.Init()
assert.NotNil(t, err)
// inject metaKV create fail
core, err = NewCore(ctx, coreFactory)
require.Nil(t, err)
assert.Nil(t, err)
randVal = rand.Int()
Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath)
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
err = core.Register()
assert.Nil(t, err)
core.kvBaseCreate = func(root string) (kv.TxnKV, error) {
if root == Params.MetaRootPath {
return nil, retry.NoRetryError(errors.New("injected"))
}
return memkv.NewMemoryKV(), nil
}
err = core.Init()
assert.NotNil(t, err)
// inject newSuffixSnapshot failure
core, err = NewCore(ctx, coreFactory)
require.Nil(t, err)
assert.Nil(t, err)
randVal = rand.Int()
Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath)
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
err = core.Register()
assert.Nil(t, err)
core.kvBaseCreate = func(string) (kv.TxnKV, error) {
return nil, nil
}
err = core.Init()
assert.NotNil(t, err)
// inject newMetaTable failure
core, err = NewCore(ctx, coreFactory)
require.Nil(t, err)
assert.Nil(t, err)
randVal = rand.Int()
Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath)
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
err = core.Register()
assert.Nil(t, err)
core.kvBaseCreate = func(string) (kv.TxnKV, error) {
kv := memkv.NewMemoryKV()
return &loadPrefixFailKV{TxnKV: kv}, nil
}
err = core.Init()
assert.NotNil(t, err)
}
func TestRootCoord(t *testing.T) {
const (
dbName = "testDb"
......
......@@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
......@@ -74,6 +75,9 @@ var _ kv.SnapShotKV = (*suffixSnapshot)(nil)
// newSuffixSnapshot creates a newSuffixSnapshot with provided kv
func newSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*suffixSnapshot, error) {
if txnKV == nil {
return nil, retry.NoRetryError(errors.New("txnKV is nil"))
}
// handles trailing / logic
tk := path.Join(snapshot, "k")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册