提交 b597a637 编写于 作者: X XuanYang-cn 提交者: yefu.chen

Add datanode metatable tests

Signed-off-by: NXuanYang-cn <xuan.yang@zilliz.com>
上级 305c8142
......@@ -24,7 +24,7 @@ func newAllocatorImpl(s MasterServiceInterface) *allocatorImpl {
func (alloc *allocatorImpl) allocID() (UniqueID, error) {
resp, err := alloc.masterService.AllocID(&masterpb.IDRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowCollections,
MsgType: commonpb.MsgType_kRequestID,
MsgID: 1, // GOOSE TODO
Timestamp: 0, // GOOSE TODO
SourceID: Params.NodeID,
......
package datanode
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"strconv"
"testing"
"time"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/master"
)
func makeNewChannelNames(names []string, suffix string) []string {
......@@ -36,52 +31,10 @@ func refreshChannelNames() {
Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix)
}
func startMaster(ctx context.Context) {
master.Init()
etcdAddr := master.Params.EtcdAddress
metaRootPath := master.Params.MetaRootPath
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
if err != nil {
panic(err)
}
_, err = etcdCli.Delete(context.TODO(), metaRootPath, clientv3.WithPrefix())
if err != nil {
panic(err)
}
masterPort := 53101
master.Params.Port = masterPort
svr, err := master.CreateServer(ctx)
if err != nil {
log.Print("create server failed", zap.Error(err))
}
if err := svr.Run(int64(master.Params.Port)); err != nil {
log.Fatal("run server failed", zap.Error(err))
}
fmt.Println("Waiting for server!", svr.IsServing())
Params.MasterAddress = master.Params.Address + ":" + strconv.Itoa(masterPort)
}
func TestMain(m *testing.M) {
Params.Init()
refreshChannelNames()
const ctxTimeInMillisecond = 2000
const closeWithDeadline = true
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
defer cancel()
} else {
ctx = context.Background()
}
startMaster(ctx)
exitCode := m.Run()
os.Exit(exitCode)
}
......
......@@ -12,9 +12,9 @@ import (
)
type metaTable struct {
client kv.TxnBase //
client kv.Base //
segID2FlushMeta map[UniqueID]*datapb.SegmentFlushMeta
collID2DdlMeta map[UniqueID]*datapb.DDLFlushMeta // GOOSE TODO: addDDLFlush and has DDLFlush
collID2DdlMeta map[UniqueID]*datapb.DDLFlushMeta
lock sync.RWMutex
}
......@@ -36,24 +36,6 @@ func NewMetaTable(kv kv.TxnBase) (*metaTable, error) {
return mt, nil
}
func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error {
mt.lock.Lock()
defer mt.lock.Unlock()
_, ok := mt.collID2DdlMeta[collID]
if !ok {
mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{
CollectionID: collID,
BinlogPaths: make([]string, 0),
}
}
meta := mt.collID2DdlMeta[collID]
meta.BinlogPaths = append(meta.BinlogPaths, paths...)
return mt.saveDDLFlushMeta(meta)
}
func (mt *metaTable) AppendSegBinlogPaths(segmentID UniqueID, fieldID int64, dataPaths []string) error {
_, ok := mt.segID2FlushMeta[segmentID]
if !ok {
......@@ -97,31 +79,23 @@ func (mt *metaTable) CompleteFlush(segmentID UniqueID) error {
return mt.saveSegFlushMeta(meta)
}
// metaTable.lock.Lock() before call this function
func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error {
value := proto.MarshalTextString(meta)
mt.collID2DdlMeta[meta.CollectionID] = meta
prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10))
return mt.client.Save(prefix, value)
}
func (mt *metaTable) reloadSegMetaFromKV() error {
mt.segID2FlushMeta = make(map[UniqueID]*datapb.SegmentFlushMeta)
func (mt *metaTable) reloadDdlMetaFromKV() error {
mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta)
_, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath)
_, values, err := mt.client.LoadWithPrefix(Params.SegFlushMetaSubPath)
if err != nil {
return err
}
for _, value := range values {
ddlMeta := &datapb.DDLFlushMeta{}
err = proto.UnmarshalText(value, ddlMeta)
flushMeta := &datapb.SegmentFlushMeta{}
err = proto.UnmarshalText(value, flushMeta)
if err != nil {
return err
}
mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta
mt.segID2FlushMeta[flushMeta.SegmentID] = flushMeta
}
return nil
}
......@@ -135,26 +109,6 @@ func (mt *metaTable) saveSegFlushMeta(meta *datapb.SegmentFlushMeta) error {
return mt.client.Save(prefix, value)
}
func (mt *metaTable) reloadSegMetaFromKV() error {
mt.segID2FlushMeta = make(map[UniqueID]*datapb.SegmentFlushMeta)
_, values, err := mt.client.LoadWithPrefix(Params.SegFlushMetaSubPath)
if err != nil {
return err
}
for _, value := range values {
flushMeta := &datapb.SegmentFlushMeta{}
err = proto.UnmarshalText(value, flushMeta)
if err != nil {
return err
}
mt.segID2FlushMeta[flushMeta.SegmentID] = flushMeta
}
return nil
}
func (mt *metaTable) addSegmentFlush(segmentID UniqueID) error {
mt.lock.Lock()
defer mt.lock.Unlock()
......@@ -197,6 +151,61 @@ func (mt *metaTable) getSegBinlogPaths(segmentID UniqueID) (map[int64][]string,
return ret, nil
}
// --- DDL ---
func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error {
mt.lock.Lock()
defer mt.lock.Unlock()
_, ok := mt.collID2DdlMeta[collID]
if !ok {
mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{
CollectionID: collID,
BinlogPaths: make([]string, 0),
}
}
meta := mt.collID2DdlMeta[collID]
meta.BinlogPaths = append(meta.BinlogPaths, paths...)
return mt.saveDDLFlushMeta(meta)
}
func (mt *metaTable) hasDDLFlushMeta(collID UniqueID) bool {
mt.lock.RLock()
defer mt.lock.RUnlock()
_, ok := mt.collID2DdlMeta[collID]
return ok
}
// metaTable.lock.Lock() before call this function
func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error {
value := proto.MarshalTextString(meta)
mt.collID2DdlMeta[meta.CollectionID] = meta
prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10))
return mt.client.Save(prefix, value)
}
func (mt *metaTable) reloadDdlMetaFromKV() error {
mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta)
_, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath)
if err != nil {
return err
}
for _, value := range values {
ddlMeta := &datapb.DDLFlushMeta{}
err = proto.UnmarshalText(value, ddlMeta)
if err != nil {
return err
}
mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta
}
return nil
}
func (mt *metaTable) getDDLBinlogPaths(collID UniqueID) (map[UniqueID][]string, error) {
mt.lock.RLock()
defer mt.lock.RUnlock()
......
package datanode
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"go.etcd.io/etcd/clientv3"
memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem"
)
func TestMetaTable_all(t *testing.T) {
func TestMetaTable_SegmentFlush(t *testing.T) {
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
require.NoError(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/meta/root")
_, err = cli.Delete(context.TODO(), "/etcd/test/meta/root", clientv3.WithPrefix())
require.NoError(t, err)
meta, err := NewMetaTable(etcdKV)
kvMock := memkv.NewMemoryKV()
meta, err := NewMetaTable(kvMock)
assert.NoError(t, err)
defer meta.client.Close()
......@@ -65,8 +55,37 @@ func TestMetaTable_all(t *testing.T) {
ret)
})
t.Run("TestMetaTable_CompleteFlush", func(t *testing.T) {
var segmentID UniqueID = 401
err := meta.addSegmentFlush(segmentID)
assert.NoError(t, err)
ret, err := meta.checkFlushComplete(segmentID)
assert.NoError(t, err)
assert.Equal(t, false, ret)
meta.CompleteFlush(segmentID)
ret, err = meta.checkFlushComplete(segmentID)
assert.NoError(t, err)
assert.Equal(t, true, ret)
})
}
func TestMetaTable_DDLFlush(t *testing.T) {
kvMock := memkv.NewMemoryKV()
meta, err := NewMetaTable(kvMock)
assert.NoError(t, err)
defer meta.client.Close()
t.Run("TestMetaTable_AppendDDLBinlogPaths", func(t *testing.T) {
assert.False(t, meta.hasDDLFlushMeta(301))
assert.False(t, meta.hasDDLFlushMeta(302))
collID2Paths := map[UniqueID][]string{
301: {"a", "b", "c"},
302: {"c", "b", "a"},
......@@ -84,24 +103,8 @@ func TestMetaTable_all(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, map[UniqueID][]string{k: v}, ret)
}
})
t.Run("TestMetaTable_CompleteFlush", func(t *testing.T) {
var segmentID UniqueID = 401
err := meta.addSegmentFlush(segmentID)
assert.NoError(t, err)
ret, err := meta.checkFlushComplete(segmentID)
assert.NoError(t, err)
assert.Equal(t, false, ret)
meta.CompleteFlush(segmentID)
ret, err = meta.checkFlushComplete(segmentID)
assert.NoError(t, err)
assert.Equal(t, true, ret)
assert.True(t, meta.hasDDLFlushMeta(301))
assert.True(t, meta.hasDDLFlushMeta(302))
})
}
......@@ -18,7 +18,7 @@ go test -race -cover "${MILVUS_DIR}/kv/..." -failfast
# TODO: remove to distributed
#go test -race -cover "${MILVUS_DIR}/proxynode/..." -failfast
#go test -race -cover "${MILVUS_DIR}/writenode/..." -failfast
#go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast
go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast
#go test -race -cover "${MILVUS_DIR}/master/..." -failfast
#go test -race -cover "${MILVUS_DIR}/indexnode/..." -failfast
#go test -race -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册