diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index d19d383e492bea1a6187810cdf0ddefa68eaeee8..bc0da6a211579340ff5091c2b2253670676daa65 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -360,7 +360,7 @@ func TestGrpcService(t *testing.T) { }) t.Run("describe collection", func(t *testing.T) { - collMeta, err := core.MetaTable.GetCollectionByName("testColl") + collMeta, err := core.MetaTable.GetCollectionByName("testColl", 0) assert.Nil(t, err) req := &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ @@ -411,10 +411,10 @@ func TestGrpcService(t *testing.T) { status, err := cli.CreatePartition(ctx, req) assert.Nil(t, err) assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_Success) - collMeta, err := core.MetaTable.GetCollectionByName("testColl") + collMeta, err := core.MetaTable.GetCollectionByName("testColl", 0) assert.Nil(t, err) assert.Equal(t, len(collMeta.PartitionIDs), 2) - partMeta, err := core.MetaTable.GetPartitionByID(collMeta.PartitionIDs[1]) + partMeta, err := core.MetaTable.GetPartitionByID(1, collMeta.PartitionIDs[1], 0) assert.Nil(t, err) assert.Equal(t, partMeta.PartitionName, "testPartition") @@ -440,7 +440,7 @@ func TestGrpcService(t *testing.T) { }) t.Run("show partition", func(t *testing.T) { - coll, err := core.MetaTable.GetCollectionByName("testColl") + coll, err := core.MetaTable.GetCollectionByName("testColl", 0) assert.Nil(t, err) req := &milvuspb.ShowPartitionsRequest{ Base: &commonpb.MsgBase{ @@ -461,10 +461,10 @@ func TestGrpcService(t *testing.T) { }) t.Run("show segment", func(t *testing.T) { - coll, err := core.MetaTable.GetCollectionByName("testColl") + coll, err := core.MetaTable.GetCollectionByName("testColl", 0) assert.Nil(t, err) partID := coll.PartitionIDs[1] - part, err := core.MetaTable.GetPartitionByID(partID) + part, err := core.MetaTable.GetPartitionByID(1, partID, 0) assert.Nil(t, err) assert.Zero(t, len(part.SegmentIDs)) seg := &datapb.SegmentInfo{ @@ -474,7 +474,7 @@ func TestGrpcService(t *testing.T) { } core.DataServiceSegmentChan <- seg time.Sleep(time.Millisecond * 100) - part, err = core.MetaTable.GetPartitionByID(partID) + part, err = core.MetaTable.GetPartitionByID(1, partID, 0) assert.Nil(t, err) assert.Equal(t, len(part.SegmentIDs), 1) @@ -513,13 +513,13 @@ func TestGrpcService(t *testing.T) { }, }, } - collMeta, err := core.MetaTable.GetCollectionByName("testColl") + collMeta, err := core.MetaTable.GetCollectionByName("testColl", 0) assert.Nil(t, err) assert.Equal(t, len(collMeta.FieldIndexes), 0) rsp, err := cli.CreateIndex(ctx, req) assert.Nil(t, err) assert.Equal(t, rsp.ErrorCode, commonpb.ErrorCode_Success) - collMeta, err = core.MetaTable.GetCollectionByName("testColl") + collMeta, err = core.MetaTable.GetCollectionByName("testColl", 0) assert.Nil(t, err) assert.Equal(t, len(collMeta.FieldIndexes), 1) @@ -535,7 +535,7 @@ func TestGrpcService(t *testing.T) { }) t.Run("describe segment", func(t *testing.T) { - coll, err := core.MetaTable.GetCollectionByName("testColl") + coll, err := core.MetaTable.GetCollectionByName("testColl", 0) assert.Nil(t, err) req := &milvuspb.DescribeSegmentRequest{ @@ -575,10 +575,10 @@ func TestGrpcService(t *testing.T) { }) t.Run("flush segment", func(t *testing.T) { - coll, err := core.MetaTable.GetCollectionByName("testColl") + coll, err := core.MetaTable.GetCollectionByName("testColl", 0) assert.Nil(t, err) partID := coll.PartitionIDs[1] - part, err := core.MetaTable.GetPartitionByID(partID) + part, err := core.MetaTable.GetPartitionByID(1, partID, 0) assert.Nil(t, err) assert.Equal(t, len(part.SegmentIDs), 1) seg := &datapb.SegmentInfo{ @@ -588,7 +588,7 @@ func TestGrpcService(t *testing.T) { } core.DataServiceSegmentChan <- seg time.Sleep(time.Millisecond * 100) - part, err = core.MetaTable.GetPartitionByID(partID) + part, err = core.MetaTable.GetPartitionByID(1, partID, 0) assert.Nil(t, err) assert.Equal(t, len(part.SegmentIDs), 2) core.DataNodeSegmentFlushCompletedChan <- 1001 @@ -656,10 +656,10 @@ func TestGrpcService(t *testing.T) { status, err := cli.DropPartition(ctx, req) assert.Nil(t, err) assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_Success) - collMeta, err := core.MetaTable.GetCollectionByName("testColl") + collMeta, err := core.MetaTable.GetCollectionByName("testColl", 0) assert.Nil(t, err) assert.Equal(t, len(collMeta.PartitionIDs), 1) - partMeta, err := core.MetaTable.GetPartitionByID(collMeta.PartitionIDs[0]) + partMeta, err := core.MetaTable.GetPartitionByID(1, collMeta.PartitionIDs[0], 0) assert.Nil(t, err) assert.Equal(t, partMeta.PartitionName, cms.Params.DefaultPartitionName) assert.Equal(t, 2, len(collectionMetaCache)) diff --git a/internal/kv/kv.go b/internal/kv/kv.go index cab253a42338da65ae14094c2144a6d4688a127d..98c6e369befa05835344059eb2567bd45ef3bbc0 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -11,6 +11,10 @@ package kv +import ( + "github.com/milvus-io/milvus/internal/util/typeutil" +) + type BaseKV interface { Load(key string) (string, error) MultiLoad(keys []string) ([]string, error) @@ -30,3 +34,11 @@ type TxnKV interface { MultiRemoveWithPrefix(keys []string) error MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error } + +type SnapShotKV interface { + Save(key, value string) (typeutil.Timestamp, error) + Load(key string, ts typeutil.Timestamp) (string, error) + MultiSave(kvs map[string]string) (typeutil.Timestamp, error) + LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) + MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) (typeutil.Timestamp, error) +} diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 0660113412aad20ed1d1b27718a9047dd23940ca..282d3b632dcd1f3218b8b66888365262a1f8cd8f 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -90,7 +90,6 @@ type Core struct { cancel context.CancelFunc etcdCli *clientv3.Client kvBase *etcdkv.EtcdKV - metaKV *etcdkv.EtcdKV //setMsgStreams, receive time tick from proxy service time tick channel ProxyTimeTickChan chan typeutil.Timestamp @@ -191,9 +190,6 @@ func (c *Core) checkInit() error { if c.etcdCli == nil { return fmt.Errorf("etcdCli is nil") } - if c.metaKV == nil { - return fmt.Errorf("metaKV is nil") - } if c.kvBase == nil { return fmt.Errorf("kvBase is nil") } @@ -307,7 +303,7 @@ func (c *Core) startDataServiceSegmentLoop() { } if seg == nil { log.Warn("segment from data service is nil") - } else if err := c.MetaTable.AddSegment(seg); err != nil { + } else if _, err := c.MetaTable.AddSegment(seg); err != nil { //what if master add segment failed, but data service success? log.Warn("add segment info meta table failed ", zap.String("error", err.Error())) } else { @@ -387,7 +383,7 @@ func (c *Core) tsLoop() { } func (c *Core) setDdMsgSendFlag(b bool) error { - flag, err := c.MetaTable.client.Load(DDMsgSendPrefix) + flag, err := c.MetaTable.client.Load(DDMsgSendPrefix, 0) if err != nil { return err } @@ -398,9 +394,11 @@ func (c *Core) setDdMsgSendFlag(b bool) error { } if b { - return c.MetaTable.client.Save(DDMsgSendPrefix, "true") + _, err = c.MetaTable.client.Save(DDMsgSendPrefix, "true") + return err } - return c.MetaTable.client.Save(DDMsgSendPrefix, "false") + _, err = c.MetaTable.client.Save(DDMsgSendPrefix, "false") + return err } func (c *Core) setMsgStreams() error { @@ -808,7 +806,7 @@ func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema, BuildID: bldID, EnableIndex: enableIdx, } - err = c.MetaTable.AddIndex(&seg) + _, err = c.MetaTable.AddIndex(&seg) return err } @@ -819,8 +817,23 @@ func (c *Core) Init() error { if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}); initError != nil { return initError } - c.metaKV = etcdkv.NewEtcdKV(c.etcdCli, Params.MetaRootPath) - if c.MetaTable, initError = NewMetaTable(c.metaKV); initError != nil { + tsAlloc := func() typeutil.Timestamp { + for { + var ts typeutil.Timestamp + var err error + if ts, err = c.tsoAllocator(1); err == nil { + return ts + } + time.Sleep(100 * time.Millisecond) + log.Debug("alloc time stamp error", zap.Error(err)) + } + } + var ms *metaSnapshot + ms, initError = newMetaSnapshot(c.etcdCli, Params.MetaRootPath, TimestampPrefix, 1024, tsAlloc) + if initError != nil { + return initError + } + if c.MetaTable, initError = NewMetaTable(ms); initError != nil { return initError } c.kvBase = etcdkv.NewEtcdKV(c.etcdCli, Params.KvRootPath) @@ -876,13 +889,13 @@ func (c *Core) Init() error { } func (c *Core) reSendDdMsg(ctx context.Context) error { - flag, err := c.MetaTable.client.Load(DDMsgSendPrefix) + flag, err := c.MetaTable.client.Load(DDMsgSendPrefix, 0) if err != nil || flag == "true" { log.Debug("No un-successful DdMsg") return nil } - ddOpStr, err := c.MetaTable.client.Load(DDOperationPrefix) + ddOpStr, err := c.MetaTable.client.Load(DDOperationPrefix, 0) if err != nil { log.Debug("DdOperation key does not exist") return nil @@ -1638,27 +1651,29 @@ func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*maste } func (c *Core) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) { - respID, err := c.metaKV.Grant(5) + respID, err := c.etcdCli.Grant(c.ctx, 5) if err != nil { fmt.Printf("grant error %s\n", err) return nil, err } c.session.NodeName = nodeName c.session.IP = ip - c.session.LeaseID = respID + c.session.LeaseID = respID.ID sessionJSON, err := json.Marshal(c.session) if err != nil { return nil, err } + ctx, cancel := context.WithTimeout(c.ctx, RequestTimeout) + defer cancel() - err = c.metaKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID) + _, err = c.etcdCli.Put(ctx, fmt.Sprintf("%s/node/%s", Params.MetaRootPath, nodeName), string(sessionJSON), clientv3.WithLease(respID.ID)) if err != nil { fmt.Printf("put lease error %s\n", err) return nil, err } - ch, err := c.metaKV.KeepAlive(respID) + ch, err := c.etcdCli.KeepAlive(c.ctx, respID.ID) if err != nil { fmt.Printf("keep alive error %s\n", err) return nil, err diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 59450646e52b8cf74269df8d8c44d548d10e66d6..fdedbb3656d6ef8ffe97c160eb4a71c861a57ca3 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -358,7 +358,7 @@ func TestMasterService(t *testing.T) { createMsg, ok := (msg.Msgs[0]).(*msgstream.CreateCollectionMsg) assert.True(t, ok) - createMeta, err := core.MetaTable.GetCollectionByName(collName) + createMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) assert.Equal(t, createMeta.ID, createMsg.CollectionID) assert.Equal(t, 1, len(createMeta.PartitionIDs)) @@ -414,15 +414,15 @@ func TestMasterService(t *testing.T) { assert.True(t, ok) createMsg, ok = (msg.Msgs[0]).(*msgstream.CreateCollectionMsg) assert.True(t, ok) - createMeta, err = core.MetaTable.GetCollectionByName("testColl-again") + createMeta, err = core.MetaTable.GetCollectionByName("testColl-again", 0) assert.Nil(t, err) assert.Equal(t, createMeta.ID, createMsg.CollectionID) // check DD operation info - flag, err := core.MetaTable.client.Load(DDMsgSendPrefix) + flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0) assert.Nil(t, err) assert.Equal(t, "true", flag) - ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix) + ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0) assert.Nil(t, err) var ddOp DdOperation err = json.Unmarshal([]byte(ddOpStr), &ddOp) @@ -490,7 +490,7 @@ func TestMasterService(t *testing.T) { }) t.Run("describe collection", func(t *testing.T) { - collMeta, err := core.MetaTable.GetCollectionByName(collName) + collMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) req := &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ @@ -544,10 +544,10 @@ func TestMasterService(t *testing.T) { status, err := core.CreatePartition(ctx, req) assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) - collMeta, err := core.MetaTable.GetCollectionByName(collName) + collMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) assert.Equal(t, 2, len(collMeta.PartitionIDs)) - partMeta, err := core.MetaTable.GetPartitionByID(collMeta.PartitionIDs[1]) + partMeta, err := core.MetaTable.GetPartitionByID(1, collMeta.PartitionIDs[1], 0) assert.Nil(t, err) assert.Equal(t, partName, partMeta.PartitionName) @@ -563,10 +563,10 @@ func TestMasterService(t *testing.T) { assert.Equal(t, collName, pm.GetCollArray()[0]) // check DD operation info - flag, err := core.MetaTable.client.Load(DDMsgSendPrefix) + flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0) assert.Nil(t, err) assert.Equal(t, "true", flag) - ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix) + ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0) assert.Nil(t, err) var ddOp DdOperation err = json.Unmarshal([]byte(ddOpStr), &ddOp) @@ -599,7 +599,7 @@ func TestMasterService(t *testing.T) { }) t.Run("show partition", func(t *testing.T) { - coll, err := core.MetaTable.GetCollectionByName(collName) + coll, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) req := &milvuspb.ShowPartitionsRequest{ Base: &commonpb.MsgBase{ @@ -620,10 +620,10 @@ func TestMasterService(t *testing.T) { }) t.Run("show segment", func(t *testing.T) { - coll, err := core.MetaTable.GetCollectionByName(collName) + coll, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) partID := coll.PartitionIDs[1] - part, err := core.MetaTable.GetPartitionByID(partID) + part, err := core.MetaTable.GetPartitionByID(1, partID, 0) assert.Nil(t, err) assert.Zero(t, len(part.SegmentIDs)) @@ -656,7 +656,7 @@ func TestMasterService(t *testing.T) { assert.Nil(t, err) time.Sleep(time.Second) - part, err = core.MetaTable.GetPartitionByID(partID) + part, err = core.MetaTable.GetPartitionByID(1, partID, 0) assert.Nil(t, err) assert.Equal(t, 1, len(part.SegmentIDs)) @@ -695,7 +695,7 @@ func TestMasterService(t *testing.T) { }, }, } - collMeta, err := core.MetaTable.GetCollectionByName(collName) + collMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) assert.Equal(t, 0, len(collMeta.FieldIndexes)) @@ -706,7 +706,7 @@ func TestMasterService(t *testing.T) { files := im.getFileArray() assert.Equal(t, 3, len(files)) assert.ElementsMatch(t, files, []string{"file0-100", "file1-100", "file2-100"}) - collMeta, err = core.MetaTable.GetCollectionByName(collName) + collMeta, err = core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) assert.Equal(t, 1, len(collMeta.FieldIndexes)) idxMeta, err := core.MetaTable.GetIndexByID(collMeta.FieldIndexes[0].IndexID) @@ -720,7 +720,7 @@ func TestMasterService(t *testing.T) { }) t.Run("describe segment", func(t *testing.T) { - coll, err := core.MetaTable.GetCollectionByName(collName) + coll, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) req := &milvuspb.DescribeSegmentRequest{ @@ -780,10 +780,10 @@ func TestMasterService(t *testing.T) { }) t.Run("flush segment", func(t *testing.T) { - coll, err := core.MetaTable.GetCollectionByName(collName) + coll, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) partID := coll.PartitionIDs[1] - part, err := core.MetaTable.GetPartitionByID(partID) + part, err := core.MetaTable.GetPartitionByID(1, partID, 0) assert.Nil(t, err) assert.Equal(t, 1, len(part.SegmentIDs)) @@ -816,7 +816,7 @@ func TestMasterService(t *testing.T) { assert.Nil(t, err) time.Sleep(time.Second) - part, err = core.MetaTable.GetPartitionByID(partID) + part, err = core.MetaTable.GetPartitionByID(1, partID, 0) assert.Nil(t, err) assert.Equal(t, 2, len(part.SegmentIDs)) @@ -875,7 +875,7 @@ func TestMasterService(t *testing.T) { }, } - collMeta, err := core.MetaTable.GetCollectionByName(collName) + collMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) assert.Equal(t, 1, len(collMeta.FieldIndexes)) oldIdx := collMeta.FieldIndexes[0].IndexID @@ -885,7 +885,7 @@ func TestMasterService(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, rsp.ErrorCode) time.Sleep(time.Second) - collMeta, err = core.MetaTable.GetCollectionByName(collName) + collMeta, err = core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) assert.Equal(t, 2, len(collMeta.FieldIndexes)) assert.Equal(t, oldIdx, collMeta.FieldIndexes[0].IndexID) @@ -943,16 +943,16 @@ func TestMasterService(t *testing.T) { CollectionName: collName, PartitionName: partName, } - collMeta, err := core.MetaTable.GetCollectionByName(collName) + collMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) dropPartID := collMeta.PartitionIDs[1] status, err := core.DropPartition(ctx, req) assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) - collMeta, err = core.MetaTable.GetCollectionByName(collName) + collMeta, err = core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) assert.Equal(t, 1, len(collMeta.PartitionIDs)) - partMeta, err := core.MetaTable.GetPartitionByID(collMeta.PartitionIDs[0]) + partMeta, err := core.MetaTable.GetPartitionByID(1, collMeta.PartitionIDs[0], 0) assert.Nil(t, err) assert.Equal(t, Params.DefaultPartitionName, partMeta.PartitionName) @@ -968,10 +968,10 @@ func TestMasterService(t *testing.T) { assert.Equal(t, collName, pm.GetCollArray()[1]) // check DD operation info - flag, err := core.MetaTable.client.Load(DDMsgSendPrefix) + flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0) assert.Nil(t, err) assert.Equal(t, "true", flag) - ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix) + ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0) assert.Nil(t, err) var ddOp DdOperation err = json.Unmarshal([]byte(ddOpStr), &ddOp) @@ -996,7 +996,7 @@ func TestMasterService(t *testing.T) { DbName: dbName, CollectionName: collName, } - collMeta, err := core.MetaTable.GetCollectionByName(collName) + collMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) status, err := core.DropCollection(ctx, req) assert.Nil(t, err) @@ -1042,10 +1042,10 @@ func TestMasterService(t *testing.T) { assert.Equal(t, collName, collArray[2]) // check DD operation info - flag, err := core.MetaTable.client.Load(DDMsgSendPrefix) + flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0) assert.Nil(t, err) assert.Equal(t, "true", flag) - ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix) + ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0) assert.Nil(t, err) var ddOp DdOperation err = json.Unmarshal([]byte(ddOpStr), &ddOp) @@ -1657,10 +1657,6 @@ func TestCheckInit(t *testing.T) { err = c.checkInit() assert.NotNil(t, err) - c.metaKV = &etcdkv.EtcdKV{} - err = c.checkInit() - assert.NotNil(t, err) - c.kvBase = &etcdkv.EtcdKV{} err = c.checkInit() assert.NotNil(t, err) diff --git a/internal/masterservice/meta_snapshot.go b/internal/masterservice/meta_snapshot.go new file mode 100644 index 0000000000000000000000000000000000000000..178ee2867ae1ce8eb5b5f9e03f37f4d04ae68084 --- /dev/null +++ b/internal/masterservice/meta_snapshot.go @@ -0,0 +1,395 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package masterservice + +import ( + "context" + "fmt" + "path" + "strconv" + "sync" + "time" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/typeutil" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" +) + +const ( + RequestTimeout = 10 * time.Second +) + +type rtPair struct { + rev int64 + ts typeutil.Timestamp +} + +type metaSnapshot struct { + cli *clientv3.Client + root string + tsKey string + lock sync.RWMutex + timeAllactor func() typeutil.Timestamp + + ts2Rev []rtPair + minPos int + maxPos int + numTs int +} + +func newMetaSnapshot(cli *clientv3.Client, root, tsKey string, bufSize int, timeAllactor func() typeutil.Timestamp) (*metaSnapshot, error) { + if bufSize <= 0 { + bufSize = 1024 + } + ms := &metaSnapshot{ + cli: cli, + root: root, + tsKey: tsKey, + lock: sync.RWMutex{}, + timeAllactor: timeAllactor, + ts2Rev: make([]rtPair, bufSize), + minPos: 0, + maxPos: 0, + numTs: 0, + } + if err := ms.loadTs(); err != nil { + return nil, err + } + return ms, nil +} + +func (ms *metaSnapshot) loadTs() error { + ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) + defer cancel() + + key := path.Join(ms.root, ms.tsKey) + resp, err := ms.cli.Get(ctx, key) + if err != nil { + return err + } + if len(resp.Kvs) <= 0 { + return nil + } + version := resp.Kvs[0].Version + revision := resp.Kvs[0].ModRevision + strTs := string(resp.Kvs[0].Value) + ts, err := strconv.ParseUint(strTs, 10, 64) + if err != nil { + return err + } + log.Info("load last ts", zap.Int64("version", version), zap.Int64("revision", revision)) + + ms.initTs(revision, ts) + for version--; version > 0; version-- { + if ms.numTs == len(ms.ts2Rev) { + break + } + revision-- + resp, err = ms.cli.Get(ctx, key, clientv3.WithRev(revision)) + if err != nil { + return err + } + if len(resp.Kvs) <= 0 { + return nil + } + + curVer := resp.Kvs[0].Version + curRev := resp.Kvs[0].ModRevision + if curVer > version { + return nil + } + strTs := string(resp.Kvs[0].Value) + curTs, err := strconv.ParseUint(strTs, 10, 64) + if err != nil { + return err + } + if curTs >= ts { + return fmt.Errorf("timestamp go back, curTs=%d,ts=%d", curTs, ts) + } + ms.initTs(curRev, curTs) + ts = curTs + revision = curRev + } + + return nil +} + +func (ms *metaSnapshot) maxTs() typeutil.Timestamp { + return ms.ts2Rev[ms.maxPos].ts +} + +func (ms *metaSnapshot) minTs() typeutil.Timestamp { + return ms.ts2Rev[ms.minPos].ts +} + +func (ms *metaSnapshot) initTs(rev int64, ts typeutil.Timestamp) { + log.Debug("init meta snapshot ts", zap.Int64("rev", rev), zap.Uint64("ts", ts)) + if ms.numTs == 0 { + ms.maxPos = len(ms.ts2Rev) - 1 + ms.minPos = len(ms.ts2Rev) - 1 + ms.numTs = 1 + ms.ts2Rev[ms.maxPos].rev = rev + ms.ts2Rev[ms.maxPos].ts = ts + } else if ms.numTs < len(ms.ts2Rev) { + ms.minPos-- + ms.numTs++ + ms.ts2Rev[ms.minPos].rev = rev + ms.ts2Rev[ms.minPos].ts = ts + } +} + +func (ms *metaSnapshot) putTs(rev int64, ts typeutil.Timestamp) { + log.Debug("put meta snapshto ts", zap.Int64("rev", rev), zap.Uint64("ts", ts)) + ms.maxPos++ + if ms.maxPos == len(ms.ts2Rev) { + ms.maxPos = 0 + } + + ms.ts2Rev[ms.maxPos].rev = rev + ms.ts2Rev[ms.maxPos].ts = ts + if ms.numTs < len(ms.ts2Rev) { + ms.numTs++ + } else { + ms.minPos++ + if ms.minPos == len(ms.ts2Rev) { + ms.minPos = 0 + } + } +} + +func (ms *metaSnapshot) searchOnCache(ts typeutil.Timestamp, start, length int) int64 { + if length == 1 { + return ms.ts2Rev[start].rev + } + begin := start + end := begin + length + mid := (begin + end) / 2 + for { + if ms.ts2Rev[mid].ts == ts { + return ms.ts2Rev[mid].rev + } + if mid == begin { + if ms.ts2Rev[mid].ts < ts || mid == start { + return ms.ts2Rev[mid].rev + } + return ms.ts2Rev[mid-1].rev + } + if ms.ts2Rev[mid].ts > ts { + end = mid + } else if ms.ts2Rev[mid].ts < ts { + begin = mid + 1 + } + mid = (begin + end) / 2 + } +} + +func (ms *metaSnapshot) getRevOnCache(ts typeutil.Timestamp) int64 { + if ms.numTs == 0 { + return 0 + } + if ts >= ms.ts2Rev[ms.maxPos].ts { + return ms.ts2Rev[ms.maxPos].rev + } + if ts < ms.ts2Rev[ms.minPos].ts { + return 0 + } + if ms.maxPos > ms.minPos { + return ms.searchOnCache(ts, ms.minPos, ms.maxPos-ms.minPos+1) + } + topVal := ms.ts2Rev[len(ms.ts2Rev)-1] + botVal := ms.ts2Rev[0] + minVal := ms.ts2Rev[ms.minPos] + maxVal := ms.ts2Rev[ms.maxPos] + if ts >= topVal.ts && ts < botVal.ts { + return topVal.rev + } else if ts >= minVal.ts && ts < topVal.ts { + return ms.searchOnCache(ts, ms.minPos, len(ms.ts2Rev)-ms.minPos) + } else if ts >= botVal.ts && ts < maxVal.ts { + return ms.searchOnCache(ts, 0, ms.maxPos+1) + } + + return 0 +} + +func (ms *metaSnapshot) getRevOnEtcd(ts typeutil.Timestamp, rev int64) int64 { + if rev < 2 { + return 0 + } + ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) + defer cancel() + + for rev--; rev >= 2; rev-- { + resp, err := ms.cli.Get(ctx, path.Join(ms.root, ms.tsKey), clientv3.WithRev(rev)) + if err != nil { + log.Debug("get ts from etcd failed", zap.Error(err)) + return 0 + } + if len(resp.Kvs) <= 0 { + return 0 + } + rev = resp.Kvs[0].ModRevision + curTs, err := strconv.ParseUint(string(resp.Kvs[0].Value), 10, 64) + if err != nil { + log.Debug("parse timestam error", zap.String("input", string(resp.Kvs[0].Value)), zap.Error(err)) + return 0 + } + if curTs <= ts { + return rev + } + } + return 0 +} + +func (ms *metaSnapshot) getRev(ts typeutil.Timestamp) (int64, error) { + rev := ms.getRevOnCache(ts) + if rev > 0 { + return rev, nil + } + rev = ms.ts2Rev[ms.minPos].rev + rev = ms.getRevOnEtcd(ts, rev) + if rev > 0 { + return rev, nil + } + return 0, fmt.Errorf("can't find revision on ts=%d", ts) +} + +func (ms *metaSnapshot) Save(key, value string) (typeutil.Timestamp, error) { + ms.lock.Lock() + defer ms.lock.Unlock() + ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) + defer cancel() + + ts := ms.timeAllactor() + strTs := strconv.FormatInt(int64(ts), 10) + resp, err := ms.cli.Txn(ctx).If().Then( + clientv3.OpPut(path.Join(ms.root, key), value), + clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs), + ).Commit() + if err != nil { + return 0, err + } + ms.putTs(resp.Header.Revision, ts) + + return ts, nil +} + +func (ms *metaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) { + ms.lock.RLock() + defer ms.lock.RUnlock() + ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) + defer cancel() + + var resp *clientv3.GetResponse + var err error + var rev int64 + if ts == 0 { + resp, err = ms.cli.Get(ctx, path.Join(ms.root, key)) + if err != nil { + return "", err + } + } else { + rev, err = ms.getRev(ts) + if err != nil { + return "", err + } + resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithRev(rev)) + if err != nil { + return "", err + } + } + if len(resp.Kvs) == 0 { + return "", fmt.Errorf("there is no value on key = %s, ts = %d", key, ts) + } + return string(resp.Kvs[0].Value), nil +} + +func (ms *metaSnapshot) MultiSave(kvs map[string]string) (typeutil.Timestamp, error) { + ms.lock.Lock() + defer ms.lock.Unlock() + ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) + defer cancel() + + ts := ms.timeAllactor() + strTs := strconv.FormatInt(int64(ts), 10) + ops := make([]clientv3.Op, 0, len(kvs)+1) + for key, value := range kvs { + ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value)) + } + ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs)) + resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit() + if err != nil { + return 0, err + } + ms.putTs(resp.Header.Revision, ts) + return ts, nil +} +func (ms *metaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) { + ms.lock.RLock() + defer ms.lock.RUnlock() + ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) + defer cancel() + + var resp *clientv3.GetResponse + var err error + var rev int64 + if ts == 0 { + resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + if err != nil { + return nil, nil, err + } + } else { + rev, err = ms.getRev(ts) + if err != nil { + return nil, nil, err + } + resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithPrefix(), clientv3.WithRev(rev), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + if err != nil { + return nil, nil, err + } + } + keys := make([]string, 0, len(resp.Kvs)) + values := make([]string, 0, len(resp.Kvs)) + tk := path.Join(ms.root, "k") + prefixLen := len(tk) - 1 + for _, kv := range resp.Kvs { + tk = string(kv.Key) + tk = tk[prefixLen:] + keys = append(keys, tk) + values = append(values, string(kv.Value)) + } + return keys, values, nil +} + +func (ms *metaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) (typeutil.Timestamp, error) { + ms.lock.Lock() + defer ms.lock.Unlock() + ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) + defer cancel() + + ts := ms.timeAllactor() + strTs := strconv.FormatInt(int64(ts), 10) + ops := make([]clientv3.Op, 0, len(saves)+len(removals)+1) + for key, value := range saves { + ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value)) + } + for _, key := range removals { + ops = append(ops, clientv3.OpDelete(path.Join(ms.root, key))) + } + ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs)) + resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit() + if err != nil { + return 0, err + } + ms.putTs(resp.Header.Revision, ts) + return ts, nil +} diff --git a/internal/masterservice/meta_snapshot_test.go b/internal/masterservice/meta_snapshot_test.go new file mode 100644 index 0000000000000000000000000000000000000000..feaf5c5b4f94299c75d86a4e34de4a0c962d6b9e --- /dev/null +++ b/internal/masterservice/meta_snapshot_test.go @@ -0,0 +1,398 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package masterservice + +import ( + "context" + "fmt" + "math/rand" + "path" + "testing" + "time" + + "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/clientv3" +) + +func TestMetaSnapshot(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + randVal := rand.Int() + + Params.Init() + etcdAddr := Params.EtcdAddress + rootPath := fmt.Sprintf("/test/meta/%d", randVal) + tsKey := "timestamp" + + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + defer etcdCli.Close() + + var vtso typeutil.Timestamp + ftso := func() typeutil.Timestamp { + return vtso + } + + ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 4, ftso) + assert.Nil(t, err) + assert.NotNil(t, ms) + + for i := 0; i < 8; i++ { + vtso = typeutil.Timestamp(100 + i) + ts, err := ms.Save("abc", fmt.Sprintf("value-%d", i)) + assert.Nil(t, err) + assert.Equal(t, vtso, ts) + _, err = etcdCli.Put(context.Background(), "other", fmt.Sprintf("other-%d", i)) + assert.Nil(t, err) + } + + ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 4, ftso) + assert.Nil(t, err) + assert.NotNil(t, ms) +} + +func TestSearchOnCache(t *testing.T) { + ms := &metaSnapshot{} + for i := 0; i < 8; i++ { + ms.ts2Rev = append(ms.ts2Rev, + rtPair{ + rev: int64(i * 2), + ts: typeutil.Timestamp(i * 2), + }) + } + rev := ms.searchOnCache(9, 0, 8) + assert.Equal(t, int64(8), rev) + rev = ms.searchOnCache(1, 0, 2) + assert.Equal(t, int64(0), rev) + rev = ms.searchOnCache(1, 0, 8) + assert.Equal(t, int64(0), rev) + rev = ms.searchOnCache(14, 0, 8) + assert.Equal(t, int64(14), rev) + rev = ms.searchOnCache(0, 0, 8) + assert.Equal(t, int64(0), rev) +} + +func TestGetRevOnCache(t *testing.T) { + ms := &metaSnapshot{} + ms.ts2Rev = make([]rtPair, 7) + ms.initTs(7, 16) + ms.initTs(6, 14) + ms.initTs(5, 12) + ms.initTs(4, 10) + + var rev int64 + rev = ms.getRevOnCache(17) + assert.Equal(t, int64(7), rev) + rev = ms.getRevOnCache(9) + assert.Equal(t, int64(0), rev) + rev = ms.getRevOnCache(10) + assert.Equal(t, int64(4), rev) + rev = ms.getRevOnCache(16) + assert.Equal(t, int64(7), rev) + rev = ms.getRevOnCache(15) + assert.Equal(t, int64(6), rev) + rev = ms.getRevOnCache(12) + assert.Equal(t, int64(5), rev) + + ms.initTs(3, 8) + ms.initTs(2, 6) + assert.Equal(t, ms.maxPos, 6) + assert.Equal(t, ms.minPos, 1) + + rev = ms.getRevOnCache(17) + assert.Equal(t, int64(7), rev) + rev = ms.getRevOnCache(9) + assert.Equal(t, int64(3), rev) + rev = ms.getRevOnCache(10) + assert.Equal(t, int64(4), rev) + rev = ms.getRevOnCache(16) + assert.Equal(t, int64(7), rev) + rev = ms.getRevOnCache(15) + assert.Equal(t, int64(6), rev) + rev = ms.getRevOnCache(12) + assert.Equal(t, int64(5), rev) + rev = ms.getRevOnCache(5) + assert.Equal(t, int64(0), rev) + + ms.putTs(8, 18) + assert.Equal(t, ms.maxPos, 0) + assert.Equal(t, ms.minPos, 1) + for rev = 2; rev <= 7; rev++ { + ts := ms.getRevOnCache(typeutil.Timestamp(rev*2 + 3)) + assert.Equal(t, rev, ts) + } + ms.putTs(9, 20) + assert.Equal(t, ms.maxPos, 1) + assert.Equal(t, ms.minPos, 2) + assert.Equal(t, ms.numTs, 7) + + curMax := ms.maxPos + curMin := ms.minPos + for i := 10; i < 20; i++ { + ms.putTs(int64(i), typeutil.Timestamp(i*2+2)) + curMax++ + curMin++ + if curMax == len(ms.ts2Rev) { + curMax = 0 + } + if curMin == len(ms.ts2Rev) { + curMin = 0 + } + assert.Equal(t, curMax, ms.maxPos) + assert.Equal(t, curMin, ms.minPos) + } + + for i := 13; i < 20; i++ { + rev = ms.getRevOnCache(typeutil.Timestamp(i*2 + 2)) + assert.Equal(t, int64(i), rev) + rev = ms.getRevOnCache(typeutil.Timestamp(i*2 + 3)) + assert.Equal(t, int64(i), rev) + } + rev = ms.getRevOnCache(27) + assert.Zero(t, rev) +} + +func TestGetRevOnEtcd(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + rand.Seed(time.Now().UnixNano()) + randVal := rand.Int() + + Params.Init() + etcdAddr := Params.EtcdAddress + rootPath := fmt.Sprintf("/test/meta/%d", randVal) + tsKey := "timestamp" + key := path.Join(rootPath, tsKey) + + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + defer etcdCli.Close() + + ms := metaSnapshot{ + cli: etcdCli, + root: rootPath, + tsKey: tsKey, + } + resp, err := etcdCli.Put(ctx, key, "100") + assert.Nil(t, err) + revList := []int64{} + tsList := []typeutil.Timestamp{} + revList = append(revList, resp.Header.Revision) + tsList = append(tsList, 100) + for i := 110; i < 200; i += 10 { + resp, err = etcdCli.Put(ctx, key, fmt.Sprintf("%d", i)) + assert.Nil(t, err) + revList = append(revList, resp.Header.Revision) + tsList = append(tsList, typeutil.Timestamp(i)) + } + lastRev := revList[len(revList)-1] + 1 + for i, ts := range tsList { + rev := ms.getRevOnEtcd(ts, lastRev) + assert.Equal(t, revList[i], rev) + } + for i := 0; i < len(tsList); i++ { + rev := ms.getRevOnEtcd(tsList[i]+5, lastRev) + assert.Equal(t, revList[i], rev) + } + rev := ms.getRevOnEtcd(200, lastRev) + assert.Equal(t, lastRev-1, rev) + rev = ms.getRevOnEtcd(99, lastRev) + assert.Zero(t, rev) +} + +func TestLoad(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + randVal := rand.Int() + + Params.Init() + etcdAddr := Params.EtcdAddress + rootPath := fmt.Sprintf("/test/meta/%d", randVal) + tsKey := "timestamp" + + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + defer etcdCli.Close() + + var vtso typeutil.Timestamp + ftso := func() typeutil.Timestamp { + return vtso + } + + ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7, ftso) + assert.Nil(t, err) + assert.NotNil(t, ms) + + for i := 0; i < 20; i++ { + vtso = typeutil.Timestamp(100 + i*5) + ts, err := ms.Save("key", fmt.Sprintf("value-%d", i)) + assert.Nil(t, err) + assert.Equal(t, vtso, ts) + } + for i := 0; i < 20; i++ { + val, err := ms.Load("key", typeutil.Timestamp(100+i*5+2)) + assert.Nil(t, err) + assert.Equal(t, val, fmt.Sprintf("value-%d", i)) + } + val, err := ms.Load("key", 0) + assert.Nil(t, err) + assert.Equal(t, "value-19", val) + + ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11, ftso) + assert.Nil(t, err) + assert.NotNil(t, ms) + + for i := 0; i < 20; i++ { + val, err := ms.Load("key", typeutil.Timestamp(100+i*5+2)) + assert.Nil(t, err) + assert.Equal(t, val, fmt.Sprintf("value-%d", i)) + } +} + +func TestMultiSave(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + randVal := rand.Int() + + Params.Init() + etcdAddr := Params.EtcdAddress + rootPath := fmt.Sprintf("/test/meta/%d", randVal) + tsKey := "timestamp" + + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + defer etcdCli.Close() + + var vtso typeutil.Timestamp + ftso := func() typeutil.Timestamp { + return vtso + } + + ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7, ftso) + assert.Nil(t, err) + assert.NotNil(t, ms) + + for i := 0; i < 20; i++ { + saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)} + vtso = typeutil.Timestamp(100 + i*5) + ts, err := ms.MultiSave(saves) + assert.Nil(t, err) + assert.Equal(t, vtso, ts) + } + for i := 0; i < 20; i++ { + keys, vals, err := ms.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2)) + assert.Nil(t, err) + assert.Equal(t, len(keys), len(vals)) + assert.Equal(t, len(keys), 2) + assert.Equal(t, keys[0], "k1") + assert.Equal(t, keys[1], "k2") + assert.Equal(t, vals[0], fmt.Sprintf("v1-%d", i)) + assert.Equal(t, vals[1], fmt.Sprintf("v2-%d", i)) + } + keys, vals, err := ms.LoadWithPrefix("k", 0) + assert.Nil(t, err) + assert.Equal(t, len(keys), len(vals)) + assert.Equal(t, len(keys), 2) + assert.Equal(t, keys[0], "k1") + assert.Equal(t, keys[1], "k2") + assert.Equal(t, vals[0], "v1-19") + assert.Equal(t, vals[1], "v2-19") + + ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11, ftso) + assert.Nil(t, err) + assert.NotNil(t, ms) + + for i := 0; i < 20; i++ { + keys, vals, err := ms.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2)) + assert.Nil(t, err) + assert.Equal(t, len(keys), len(vals)) + assert.Equal(t, len(keys), 2) + assert.Equal(t, keys[0], "k1") + assert.Equal(t, keys[1], "k2") + assert.Equal(t, vals[0], fmt.Sprintf("v1-%d", i)) + assert.Equal(t, vals[1], fmt.Sprintf("v2-%d", i)) + } +} + +func TestMultiSaveAndRemoveWithPrefix(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + randVal := rand.Int() + + Params.Init() + etcdAddr := Params.EtcdAddress + rootPath := fmt.Sprintf("/test/meta/%d", randVal) + tsKey := "timestamp" + + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + + var vtso typeutil.Timestamp + ftso := func() typeutil.Timestamp { + return vtso + } + defer etcdCli.Close() + + ms, err := newMetaSnapshot(etcdCli, rootPath, tsKey, 7, ftso) + assert.Nil(t, err) + assert.NotNil(t, ms) + + for i := 0; i < 20; i++ { + vtso = typeutil.Timestamp(100 + i*5) + ts, err := ms.Save(fmt.Sprintf("kd-%d", i), fmt.Sprintf("value-%d", i)) + assert.Nil(t, err) + assert.Equal(t, vtso, ts) + } + for i := 20; i < 40; i++ { + sm := map[string]string{"ks": fmt.Sprintf("value-%d", i)} + dm := []string{fmt.Sprintf("kd-%d", i-20)} + vtso = typeutil.Timestamp(100 + i*5) + ts, err := ms.MultiSaveAndRemoveWithPrefix(sm, dm) + assert.Nil(t, err) + assert.Equal(t, vtso, ts) + } + + for i := 0; i < 20; i++ { + val, err := ms.Load(fmt.Sprintf("kd-%d", i), typeutil.Timestamp(100+i*5+2)) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("value-%d", i), val) + _, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2)) + assert.Nil(t, err) + assert.Equal(t, i+1, len(vals)) + } + for i := 20; i < 40; i++ { + val, err := ms.Load("ks", typeutil.Timestamp(100+i*5+2)) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("value-%d", i), val) + _, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2)) + assert.Nil(t, err) + assert.Equal(t, 39-i, len(vals)) + } + + ms, err = newMetaSnapshot(etcdCli, rootPath, tsKey, 11, ftso) + assert.Nil(t, err) + assert.NotNil(t, ms) + + for i := 0; i < 20; i++ { + val, err := ms.Load(fmt.Sprintf("kd-%d", i), typeutil.Timestamp(100+i*5+2)) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("value-%d", i), val) + _, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2)) + assert.Nil(t, err) + assert.Equal(t, i+1, len(vals)) + } + for i := 20; i < 40; i++ { + val, err := ms.Load("ks", typeutil.Timestamp(100+i*5+2)) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("value-%d", i), val) + _, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2)) + assert.Nil(t, err) + assert.Equal(t, 39-i, len(vals)) + } +} diff --git a/internal/masterservice/meta_table.go b/internal/masterservice/meta_table.go index f8f81f6d67f47785d413066c346922dae5d08521..135a6f78951a14bc0da9791a0c5df2fbd7e86fca 100644 --- a/internal/masterservice/meta_table.go +++ b/internal/masterservice/meta_table.go @@ -39,6 +39,8 @@ const ( SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index" IndexMetaPrefix = ComponentPrefix + "/index" + TimestampPrefix = ComponentPrefix + "/timestamp" + DDOperationPrefix = ComponentPrefix + "/dd-operation" DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send" @@ -49,7 +51,7 @@ const ( ) type metaTable struct { - client kv.TxnKV // client of a reliable kv service, i.e. etcd client + client kv.SnapShotKV // client of a reliable kv service, i.e. etcd client tenantID2Meta map[typeutil.UniqueID]pb.TenantMeta // tenant id to tenant meta proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection_id -> meta @@ -68,7 +70,7 @@ type metaTable struct { ddLock sync.RWMutex } -func NewMetaTable(kv kv.TxnKV) (*metaTable, error) { +func NewMetaTable(kv kv.SnapShotKV) (*metaTable, error) { mt := &metaTable{ client: kv, tenantLock: sync.RWMutex{}, @@ -97,7 +99,7 @@ func (mt *metaTable) reloadFromKV() error { mt.flushedSegID = make(map[typeutil.UniqueID]bool) mt.vChan2Chan = make(map[string]string) - _, values, err := mt.client.LoadWithPrefix(TenantMetaPrefix) + _, values, err := mt.client.LoadWithPrefix(TenantMetaPrefix, 0) if err != nil { return err } @@ -111,7 +113,7 @@ func (mt *metaTable) reloadFromKV() error { mt.tenantID2Meta[tenantMeta.ID] = tenantMeta } - _, values, err = mt.client.LoadWithPrefix(ProxyMetaPrefix) + _, values, err = mt.client.LoadWithPrefix(ProxyMetaPrefix, 0) if err != nil { return err } @@ -125,7 +127,7 @@ func (mt *metaTable) reloadFromKV() error { mt.proxyID2Meta[proxyMeta.ID] = proxyMeta } - _, values, err = mt.client.LoadWithPrefix(CollectionMetaPrefix) + _, values, err = mt.client.LoadWithPrefix(CollectionMetaPrefix, 0) if err != nil { return err } @@ -147,7 +149,7 @@ func (mt *metaTable) reloadFromKV() error { } } - _, values, err = mt.client.LoadWithPrefix(PartitionMetaPrefix) + _, values, err = mt.client.LoadWithPrefix(PartitionMetaPrefix, 0) if err != nil { return err } @@ -170,7 +172,7 @@ func (mt *metaTable) reloadFromKV() error { } } - _, values, err = mt.client.LoadWithPrefix(SegmentIndexMetaPrefix) + _, values, err = mt.client.LoadWithPrefix(SegmentIndexMetaPrefix, 0) if err != nil { return err } @@ -190,7 +192,7 @@ func (mt *metaTable) reloadFromKV() error { } } - _, values, err = mt.client.LoadWithPrefix(IndexMetaPrefix) + _, values, err = mt.client.LoadWithPrefix(IndexMetaPrefix, 0) if err != nil { return err } @@ -206,49 +208,51 @@ func (mt *metaTable) reloadFromKV() error { return nil } -func (mt *metaTable) AddTenant(te *pb.TenantMeta) error { +func (mt *metaTable) AddTenant(te *pb.TenantMeta) (typeutil.Timestamp, error) { mt.tenantLock.Lock() defer mt.tenantLock.Unlock() k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID) v := proto.MarshalTextString(te) - if err := mt.client.Save(k, v); err != nil { - return err + ts, err := mt.client.Save(k, v) + if err != nil { + return 0, err } mt.tenantID2Meta[te.ID] = *te - return nil + return ts, nil } -func (mt *metaTable) AddProxy(po *pb.ProxyMeta) error { +func (mt *metaTable) AddProxy(po *pb.ProxyMeta) (typeutil.Timestamp, error) { mt.proxyLock.Lock() defer mt.proxyLock.Unlock() k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID) v := proto.MarshalTextString(po) - if err := mt.client.Save(k, v); err != nil { - return err + ts, err := mt.client.Save(k, v) + if err != nil { + return 0, err } mt.proxyID2Meta[po.ID] = *po - return nil + return ts, nil } -func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo, ddOpStr string) error { +func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo, ddOpStr string) (typeutil.Timestamp, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() if len(part.SegmentIDs) != 0 { - return errors.New("segment should be empty when creating collection") + return 0, errors.New("segment should be empty when creating collection") } if len(coll.PartitionIDs) != 0 { - return errors.New("partitions should be empty when creating collection") + return 0, errors.New("partitions should be empty when creating collection") } if _, ok := mt.collName2ID[coll.Schema.Name]; ok { - return fmt.Errorf("collection %s exist", coll.Schema.Name) + return 0, fmt.Errorf("collection %s exist", coll.Schema.Name) } if len(coll.FieldIndexes) != len(idx) { - return fmt.Errorf("incorrect index id when creating collection") + return 0, fmt.Errorf("incorrect index id when creating collection") } coll.PartitionIDs = append(coll.PartitionIDs, part.PartitionID) @@ -281,22 +285,22 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionIn meta[DDOperationPrefix] = ddOpStr meta[DDMsgSendPrefix] = "false" - err := mt.client.MultiSave(meta) + ts, err := mt.client.MultiSave(meta) if err != nil { _ = mt.reloadFromKV() - return err + return 0, err } - return nil + return ts, nil } -func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr string) error { +func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr string) (typeutil.Timestamp, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() collMeta, ok := mt.collID2Meta[collID] if !ok { - return fmt.Errorf("can't find collection. id = %d", collID) + return 0, fmt.Errorf("can't find collection. id = %d", collID) } delete(mt.collID2Meta, collID) @@ -347,49 +351,84 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr string) DDMsgSendPrefix: "false", } - err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys) + ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys) if err != nil { _ = mt.reloadFromKV() - return err + return 0, err } - return nil + return ts, nil } -func (mt *metaTable) HasCollection(collID typeutil.UniqueID) bool { +func (mt *metaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool { mt.ddLock.RLock() defer mt.ddLock.RUnlock() - _, ok := mt.collID2Meta[collID] - return ok + if ts == 0 { + _, ok := mt.collID2Meta[collID] + return ok + } + key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID) + _, err := mt.client.Load(key, ts) + return err == nil } -func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID) (*pb.CollectionInfo, error) { +func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) { mt.ddLock.RLock() defer mt.ddLock.RUnlock() - col, ok := mt.collID2Meta[collectionID] - if !ok { - return nil, fmt.Errorf("can't find collection id : %d", collectionID) + if ts == 0 { + col, ok := mt.collID2Meta[collectionID] + if !ok { + return nil, fmt.Errorf("can't find collection id : %d", collectionID) + } + colCopy := proto.Clone(&col) + return colCopy.(*pb.CollectionInfo), nil } - colCopy := proto.Clone(&col) - - return colCopy.(*pb.CollectionInfo), nil + key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID) + val, err := mt.client.Load(key, ts) + if err != nil { + return nil, err + } + colMeta := pb.CollectionInfo{} + err = proto.UnmarshalText(val, &colMeta) + if err != nil { + return nil, err + } + return &colMeta, nil } -func (mt *metaTable) GetCollectionByName(collectionName string) (*pb.CollectionInfo, error) { +func (mt *metaTable) GetCollectionByName(collectionName string, ts typeutil.Timestamp) (*pb.CollectionInfo, error) { mt.ddLock.RLock() defer mt.ddLock.RUnlock() - vid, ok := mt.collName2ID[collectionName] - if !ok { - return nil, fmt.Errorf("can't find collection: " + collectionName) + if ts == 0 { + vid, ok := mt.collName2ID[collectionName] + if !ok { + return nil, fmt.Errorf("can't find collection: " + collectionName) + } + col, ok := mt.collID2Meta[vid] + if !ok { + return nil, fmt.Errorf("can't find collection: " + collectionName) + } + colCopy := proto.Clone(&col) + return colCopy.(*pb.CollectionInfo), nil } - col, ok := mt.collID2Meta[vid] - if !ok { - return nil, fmt.Errorf("can't find collection: " + collectionName) + _, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts) + if err != nil { + return nil, err } - colCopy := proto.Clone(&col) - return colCopy.(*pb.CollectionInfo), nil + for _, val := range vals { + collMeta := pb.CollectionInfo{} + err = proto.UnmarshalText(val, &collMeta) + if err != nil { + log.Debug("unmarshal collection info failed", zap.Error(err)) + continue + } + if collMeta.Schema.Name == collectionName { + return &collMeta, nil + } + } + return nil, fmt.Errorf("can't find collection: %s, at timestamp = %d", collectionName, ts) } func (mt *metaTable) GetCollectionBySegmentID(segID typeutil.UniqueID) (*pb.CollectionInfo, error) { @@ -408,28 +447,44 @@ func (mt *metaTable) GetCollectionBySegmentID(segID typeutil.UniqueID) (*pb.Coll return colCopy.(*pb.CollectionInfo), nil } -func (mt *metaTable) ListCollections() ([]string, error) { +func (mt *metaTable) ListCollections(ts typeutil.Timestamp) ([]string, error) { mt.ddLock.RLock() defer mt.ddLock.RUnlock() - colls := make([]string, 0, len(mt.collName2ID)) - for name := range mt.collName2ID { - colls = append(colls, name) + if ts == 0 { + colls := make([]string, 0, len(mt.collName2ID)) + for name := range mt.collName2ID { + colls = append(colls, name) + } + return colls, nil + } + _, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts) + if err != nil { + return nil, err + } + colls := make([]string, 0, len(vals)) + for _, val := range vals { + collMeta := pb.CollectionInfo{} + err := proto.UnmarshalText(val, &collMeta) + if err != nil { + log.Debug("unmarshal collection info failed", zap.Error(err)) + } + colls = append(colls, collMeta.Schema.Name) } return colls, nil } -func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr string) error { +func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr string) (typeutil.Timestamp, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() coll, ok := mt.collID2Meta[collID] if !ok { - return fmt.Errorf("can't find collection. id = %d", collID) + return 0, fmt.Errorf("can't find collection. id = %d", collID) } // number of partition tags (except _default) should be limited to 4096 by default if int64(len(coll.PartitionIDs)) >= Params.MaxPartitionNum { - return fmt.Errorf("maximum partition's number should be limit to %d", Params.MaxPartitionNum) + return 0, fmt.Errorf("maximum partition's number should be limit to %d", Params.MaxPartitionNum) } for _, t := range coll.PartitionIDs { part, ok := mt.partitionID2Meta[t] @@ -438,10 +493,10 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string continue } if part.PartitionName == partitionName { - return fmt.Errorf("partition name = %s already exists", partitionName) + return 0, fmt.Errorf("partition name = %s already exists", partitionName) } if part.PartitionID == partitionID { - return fmt.Errorf("partition id = %d already exists", partitionID) + return 0, fmt.Errorf("partition id = %d already exists", partitionID) } } partMeta := pb.PartitionInfo{ @@ -464,52 +519,83 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string meta[DDOperationPrefix] = ddOpStr meta[DDMsgSendPrefix] = "false" - err := mt.client.MultiSave(meta) + ts, err := mt.client.MultiSave(meta) if err != nil { _ = mt.reloadFromKV() - return err + return 0, err } - return nil + return ts, nil } -func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string) (pb.PartitionInfo, error) { - collMeta, ok := mt.collID2Meta[collID] - if !ok { - return pb.PartitionInfo{}, fmt.Errorf("can't find collection id = %d", collID) +func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (pb.PartitionInfo, error) { + if ts == 0 { + collMeta, ok := mt.collID2Meta[collID] + if !ok { + return pb.PartitionInfo{}, fmt.Errorf("can't find collection id = %d", collID) + } + for _, id := range collMeta.PartitionIDs { + partMeta, ok := mt.partitionID2Meta[id] + if ok && partMeta.PartitionName == partitionName { + return partMeta, nil + } + } + return pb.PartitionInfo{}, fmt.Errorf("partition %s does not exist", partitionName) + } + collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID) + collVal, err := mt.client.Load(collKey, ts) + if err != nil { + return pb.PartitionInfo{}, err + } + collMeta := pb.CollectionMeta{} + err = proto.UnmarshalText(collVal, &collMeta) + if err != nil { + return pb.PartitionInfo{}, err } for _, id := range collMeta.PartitionIDs { - partMeta, ok := mt.partitionID2Meta[id] - if ok && partMeta.PartitionName == partitionName { + partKey := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, collID, id) + partVal, err := mt.client.Load(partKey, ts) + if err != nil { + log.Debug("load partition meta failed", zap.String("collection name", collMeta.Schema.Name), zap.Int64("partition id", id)) + continue + } + partMeta := pb.PartitionInfo{} + err = proto.UnmarshalText(partVal, &partMeta) + if err != nil { + log.Debug("unmarshal partition meta failed", zap.Error(err)) + continue + } + if partMeta.PartitionName == partitionName { return partMeta, nil } } return pb.PartitionInfo{}, fmt.Errorf("partition %s does not exist", partitionName) } -func (mt *metaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string) (pb.PartitionInfo, error) { +func (mt *metaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (pb.PartitionInfo, error) { mt.ddLock.RLock() defer mt.ddLock.RUnlock() - return mt.getPartitionByName(collID, partitionName) + return mt.getPartitionByName(collID, partitionName, ts) } -func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string) bool { +func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) bool { mt.ddLock.RLock() defer mt.ddLock.RUnlock() - _, err := mt.getPartitionByName(collID, partitionName) + _, err := mt.getPartitionByName(collID, partitionName, ts) return err == nil } -func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr string) (typeutil.UniqueID, error) { +//return timestamp, partitionid, error +func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr string) (typeutil.Timestamp, typeutil.UniqueID, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() if partitionName == Params.DefaultPartitionName { - return 0, fmt.Errorf("default partition cannot be deleted") + return 0, 0, fmt.Errorf("default partition cannot be deleted") } collMeta, ok := mt.collID2Meta[collID] if !ok { - return 0, fmt.Errorf("can't find collection id = %d", collID) + return 0, 0, fmt.Errorf("can't find collection id = %d", collID) } // check tag exists @@ -529,7 +615,7 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str } } if !exist { - return 0, fmt.Errorf("partition %s does not exist", partitionName) + return 0, 0, fmt.Errorf("partition %s does not exist", partitionName) } delete(mt.partitionID2Meta, partMeta.PartitionID) collMeta.PartitionIDs = pd @@ -560,34 +646,48 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str meta[DDOperationPrefix] = ddOpStr meta[DDMsgSendPrefix] = "false" - err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys) + ts, err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys) if err != nil { _ = mt.reloadFromKV() - return 0, err + return 0, 0, err } - return partMeta.PartitionID, nil + return ts, partMeta.PartitionID, nil } -func (mt *metaTable) GetPartitionByID(partitionID typeutil.UniqueID) (pb.PartitionInfo, error) { +func (mt *metaTable) GetPartitionByID(collID typeutil.UniqueID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (pb.PartitionInfo, error) { mt.ddLock.RLock() defer mt.ddLock.RUnlock() - partMeta, ok := mt.partitionID2Meta[partitionID] - if !ok { - return pb.PartitionInfo{}, fmt.Errorf("partition id = %d not exist", partitionID) + if ts == 0 { + partMeta, ok := mt.partitionID2Meta[partitionID] + if !ok { + return pb.PartitionInfo{}, fmt.Errorf("partition id = %d not exist", partitionID) + } + return partMeta, nil } - return partMeta, nil + partKey := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, collID, partitionID) + partVal, err := mt.client.Load(partKey, ts) + if err != nil { + return pb.PartitionInfo{}, err + } + partInfo := pb.PartitionInfo{} + err = proto.UnmarshalText(partVal, &partInfo) + if err != nil { + return pb.PartitionInfo{}, err + } + return partInfo, nil + } -func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error { +func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) (typeutil.Timestamp, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() collMeta, ok := mt.collID2Meta[seg.CollectionID] if !ok { - return fmt.Errorf("can't find collection id = %d", seg.CollectionID) + return 0, fmt.Errorf("can't find collection id = %d", seg.CollectionID) } partMeta, ok := mt.partitionID2Meta[seg.PartitionID] if !ok { - return fmt.Errorf("can't find partition id = %d", seg.PartitionID) + return 0, fmt.Errorf("can't find partition id = %d", seg.PartitionID) } exist := false for _, partID := range collMeta.PartitionIDs { @@ -597,7 +697,7 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error { } } if !exist { - return fmt.Errorf("partition id = %d, not belong to collection id = %d", seg.PartitionID, seg.CollectionID) + return 0, fmt.Errorf("partition id = %d, not belong to collection id = %d", seg.PartitionID, seg.CollectionID) } exist = false for _, segID := range partMeta.SegmentIDs { @@ -606,7 +706,7 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error { } } if exist { - return fmt.Errorf("segment id = %d exist", seg.ID) + return 0, fmt.Errorf("segment id = %d exist", seg.ID) } partMeta.SegmentIDs = append(partMeta.SegmentIDs, seg.ID) mt.partitionID2Meta[seg.PartitionID] = partMeta @@ -615,30 +715,30 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error { k := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, seg.CollectionID, seg.PartitionID) v := proto.MarshalTextString(&partMeta) - err := mt.client.Save(k, v) + ts, err := mt.client.Save(k, v) if err != nil { _ = mt.reloadFromKV() - return err + return 0, err } - return nil + return ts, nil } -func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) error { +func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timestamp, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() collID, ok := mt.segID2CollID[segIdxInfo.SegmentID] if !ok { - return fmt.Errorf("segment id = %d not belong to any collection", segIdxInfo.SegmentID) + return 0, fmt.Errorf("segment id = %d not belong to any collection", segIdxInfo.SegmentID) } collMeta, ok := mt.collID2Meta[collID] if !ok { - return fmt.Errorf("collection id = %d not found", collID) + return 0, fmt.Errorf("collection id = %d not found", collID) } partID, ok := mt.segID2PartitionID[segIdxInfo.SegmentID] if !ok { - return fmt.Errorf("segment id = %d not belong to any partition", segIdxInfo.SegmentID) + return 0, fmt.Errorf("segment id = %d not belong to any partition", segIdxInfo.SegmentID) } exist := false for _, fidx := range collMeta.FieldIndexes { @@ -648,7 +748,7 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) error { } } if !exist { - return fmt.Errorf("index id = %d not found", segIdxInfo.IndexID) + return 0, fmt.Errorf("index id = %d not found", segIdxInfo.IndexID) } segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID] @@ -660,9 +760,9 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) error { if ok { if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) { log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID)) - return nil + return 0, nil } - return fmt.Errorf("index id = %d exist", segIdxInfo.IndexID) + return 0, fmt.Errorf("index id = %d exist", segIdxInfo.IndexID) } } @@ -670,34 +770,35 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) error { k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, collID, segIdxInfo.IndexID, partID, segIdxInfo.SegmentID) v := proto.MarshalTextString(segIdxInfo) - err := mt.client.Save(k, v) + ts, err := mt.client.Save(k, v) if err != nil { _ = mt.reloadFromKV() - return err + return 0, err } if _, ok := mt.flushedSegID[segIdxInfo.SegmentID]; !ok { mt.flushedSegID[segIdxInfo.SegmentID] = true } - return nil + return ts, nil } -func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.UniqueID, bool, error) { +//return timestamp, index id, is dropped, error +func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.Timestamp, typeutil.UniqueID, bool, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() collID, ok := mt.collName2ID[collName] if !ok { - return 0, false, fmt.Errorf("collection name = %s not exist", collName) + return 0, 0, false, fmt.Errorf("collection name = %s not exist", collName) } collMeta, ok := mt.collID2Meta[collID] if !ok { - return 0, false, fmt.Errorf("collection name = %s not has meta", collName) + return 0, 0, false, fmt.Errorf("collection name = %s not has meta", collName) } fieldSch, err := mt.unlockGetFieldSchema(collName, fieldName) if err != nil { - return 0, false, err + return 0, 0, false, err } fieldIdxInfo := make([]*pb.FieldIndexInfo, 0, len(collMeta.FieldIndexes)) var dropIdxID typeutil.UniqueID @@ -722,7 +823,7 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil. } if len(fieldIdxInfo) == len(collMeta.FieldIndexes) { log.Warn("drop index,index not found", zap.String("collection name", collName), zap.String("filed name", fieldName), zap.String("index name", indexName)) - return 0, false, nil + return 0, 0, false, nil } collMeta.FieldIndexes = fieldIdxInfo mt.collID2Meta[collID] = collMeta @@ -751,13 +852,13 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil. fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID), } - err = mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta) + ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta) if err != nil { _ = mt.reloadFromKV() - return 0, false, err + return 0, 0, false, err } - return dropIdxID, true, nil + return ts, dropIdxID, true, nil } func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) { @@ -930,7 +1031,7 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id meta[k] = v } - err = mt.client.MultiSave(meta) + _, err = mt.client.MultiSave(meta) if err != nil { _ = mt.reloadFromKV() return nil, schemapb.FieldSchema{}, err @@ -953,7 +1054,7 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id meta[k] = v } - err = mt.client.MultiSave(meta) + _, err = mt.client.MultiSave(meta) if err != nil { _ = mt.reloadFromKV() return nil, schemapb.FieldSchema{}, err diff --git a/internal/masterservice/meta_table_test.go b/internal/masterservice/meta_table_test.go index 9255cf6a7d465d01d293deb844a0479c854a3804..d4842e73aaf9363ca663882a3f554dd0123cb3bb 100644 --- a/internal/masterservice/meta_table_test.go +++ b/internal/masterservice/meta_table_test.go @@ -19,7 +19,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/kv" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" @@ -32,36 +31,35 @@ import ( type mockTestKV struct { kv.TxnKV - loadWithPrefix func(key string) ([]string, []string, error) - save func(key, value string) error - multiSave func(kvs map[string]string) error - multiRemoveWithPrefix func(keys []string) error - multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string) error + loadWithPrefix func(key string, ts typeutil.Timestamp) ([]string, []string, error) + save func(key, value string) (typeutil.Timestamp, error) + multiSave func(kvs map[string]string) (typeutil.Timestamp, error) + multiSaveAndRemoveWithPrefix func(saves map[string]string, removals []string) (typeutil.Timestamp, error) } -func (m *mockTestKV) LoadWithPrefix(key string) ([]string, []string, error) { - return m.loadWithPrefix(key) +func (m *mockTestKV) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) { + return m.loadWithPrefix(key, ts) +} +func (m *mockTestKV) Load(key string, ts typeutil.Timestamp) (string, error) { + return "", nil } -func (m *mockTestKV) Save(key, value string) error { +func (m *mockTestKV) Save(key, value string) (typeutil.Timestamp, error) { return m.save(key, value) } -func (m *mockTestKV) MultiSave(kvs map[string]string) error { +func (m *mockTestKV) MultiSave(kvs map[string]string) (typeutil.Timestamp, error) { return m.multiSave(kvs) } -func (m *mockTestKV) MultiRemoveWithPrefix(keys []string) error { - return m.multiRemoveWithPrefix(keys) -} -func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { +func (m *mockTestKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) (typeutil.Timestamp, error) { return m.multiSaveAndRemoveWithPrefix(saves, removals) } func Test_MockKV(t *testing.T) { k1 := &mockTestKV{} prefix := make(map[string][]string) - k1.loadWithPrefix = func(key string) ([]string, []string, error) { + k1.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { if val, ok := prefix[key]; ok { return nil, val, nil } @@ -131,17 +129,17 @@ func Test_MockKV(t *testing.T) { m1, err := NewMetaTable(k1) assert.Nil(t, err) - k1.save = func(key, value string) error { - return fmt.Errorf("save tenant error") + k1.save = func(key, value string) (typeutil.Timestamp, error) { + return 0, fmt.Errorf("save tenant error") } - err = m1.AddTenant(&pb.TenantMeta{}) + _, err = m1.AddTenant(&pb.TenantMeta{}) assert.NotNil(t, err) assert.EqualError(t, err, "save tenant error") - k1.save = func(key, value string) error { - return fmt.Errorf("save proxy error") + k1.save = func(key, value string) (typeutil.Timestamp, error) { + return 0, fmt.Errorf("save proxy error") } - err = m1.AddProxy(&pb.ProxyMeta{}) + _, err = m1.AddProxy(&pb.ProxyMeta{}) assert.NotNil(t, err) assert.EqualError(t, err, "save proxy error") } @@ -168,11 +166,18 @@ func TestMetaTable(t *testing.T) { etcdAddr := Params.EtcdAddress rootPath := fmt.Sprintf("/test/meta/%d", randVal) + var vtso typeutil.Timestamp + ftso := func() typeutil.Timestamp { + vtso++ + return vtso + } + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) - ekv := etcdkv.NewEtcdKV(etcdCli, rootPath) - assert.NotNil(t, ekv) - mt, err := NewMetaTable(ekv) + skv, err := newMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7, ftso) + assert.Nil(t, err) + assert.NotNil(t, skv) + mt, err := NewMetaTable(skv) assert.Nil(t, err) collInfo := &pb.CollectionInfo{ @@ -249,42 +254,43 @@ func TestMetaTable(t *testing.T) { t.Run("add collection", func(t *testing.T) { partInfoDefault.SegmentIDs = []int64{segID} - err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "") assert.NotNil(t, err) partInfoDefault.SegmentIDs = []int64{} collInfo.PartitionIDs = []int64{segID} - err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "") assert.NotNil(t, err) collInfo.PartitionIDs = []int64{} - err = mt.AddCollection(collInfo, partInfoDefault, nil, "") + _, err = mt.AddCollection(collInfo, partInfoDefault, nil, "") assert.NotNil(t, err) - err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfoDefault, idxInfo, "") assert.Nil(t, err) - collMeta, err := mt.GetCollectionByName("testColl") + collMeta, err := mt.GetCollectionByName("testColl", 0) assert.Nil(t, err) assert.Equal(t, partIDDefault, collMeta.PartitionIDs[0]) assert.Equal(t, 1, len(collMeta.PartitionIDs)) - assert.True(t, mt.HasCollection(collInfo.ID)) + assert.True(t, mt.HasCollection(collInfo.ID, 0)) field, err := mt.GetFieldSchema("testColl", "field110") assert.Nil(t, err) assert.Equal(t, collInfo.Schema.Fields[0].FieldID, field.FieldID) // check DD operation flag - flag, err := mt.client.Load(DDMsgSendPrefix) + flag, err := mt.client.Load(DDMsgSendPrefix, 0) assert.Nil(t, err) assert.Equal(t, "false", flag) }) t.Run("add partition", func(t *testing.T) { - assert.Nil(t, mt.AddPartition(collID, partInfo.PartitionName, partInfo.PartitionID, "")) + _, err := mt.AddPartition(collID, partInfo.PartitionName, partInfo.PartitionID, "") + assert.Nil(t, err) // check DD operation flag - flag, err := mt.client.Load(DDMsgSendPrefix) + flag, err := mt.client.Load(DDMsgSendPrefix, 0) assert.Nil(t, err) assert.Equal(t, "false", flag) }) @@ -295,16 +301,25 @@ func TestMetaTable(t *testing.T) { CollectionID: collID, PartitionID: partID, } - assert.Nil(t, mt.AddSegment(seg)) - assert.NotNil(t, mt.AddSegment(seg)) + _, err := mt.AddSegment(seg) + assert.Nil(t, err) + + _, err = mt.AddSegment(seg) + assert.NotNil(t, err) + seg.ID = segID2 seg.CollectionID = collIDInvalid - assert.NotNil(t, mt.AddSegment(seg)) + _, err = mt.AddSegment(seg) + assert.NotNil(t, err) + seg.CollectionID = collID seg.PartitionID = partIDInvalid - assert.NotNil(t, mt.AddSegment(seg)) + _, err = mt.AddSegment(seg) + assert.NotNil(t, err) + seg.PartitionID = partID - assert.Nil(t, mt.AddSegment(seg)) + _, err = mt.AddSegment(seg) + assert.Nil(t, err) }) t.Run("add segment index", func(t *testing.T) { @@ -314,15 +329,15 @@ func TestMetaTable(t *testing.T) { IndexID: indexID, BuildID: buildID, } - err := mt.AddIndex(&segIdxInfo) + _, err := mt.AddIndex(&segIdxInfo) assert.Nil(t, err) // it's legal to add index twice - err = mt.AddIndex(&segIdxInfo) + _, err = mt.AddIndex(&segIdxInfo) assert.Nil(t, err) segIdxInfo.BuildID = 202 - err = mt.AddIndex(&segIdxInfo) + _, err = mt.AddIndex(&segIdxInfo) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("index id = %d exist", segIdxInfo.IndexID)) }) @@ -408,25 +423,25 @@ func TestMetaTable(t *testing.T) { te := pb.TenantMeta{ ID: 100, } - err := mt.AddTenant(&te) + _, err := mt.AddTenant(&te) assert.Nil(t, err) po := pb.ProxyMeta{ ID: 101, } - err = mt.AddProxy(&po) + _, err = mt.AddProxy(&po) assert.Nil(t, err) - _, err = NewMetaTable(ekv) + _, err = NewMetaTable(skv) assert.Nil(t, err) }) t.Run("drop index", func(t *testing.T) { - idx, ok, err := mt.DropIndex("testColl", "field110", "field110") + _, idx, ok, err := mt.DropIndex("testColl", "field110", "field110") assert.Nil(t, err) assert.True(t, ok) assert.Equal(t, indexID, idx) - _, ok, err = mt.DropIndex("testColl", "field110", "field110-error") + _, _, ok, err = mt.DropIndex("testColl", "field110", "field110-error") assert.Nil(t, err) assert.False(t, ok) @@ -444,24 +459,24 @@ func TestMetaTable(t *testing.T) { }) t.Run("drop partition", func(t *testing.T) { - id, err := mt.DeletePartition(collID, partInfo.PartitionName, "") + _, id, err := mt.DeletePartition(collID, partInfo.PartitionName, "") assert.Nil(t, err) assert.Equal(t, partID, id) // check DD operation flag - flag, err := mt.client.Load(DDMsgSendPrefix) + flag, err := mt.client.Load(DDMsgSendPrefix, 0) assert.Nil(t, err) assert.Equal(t, "false", flag) }) t.Run("drop collection", func(t *testing.T) { - err = mt.DeleteCollection(collIDInvalid, "") + _, err = mt.DeleteCollection(collIDInvalid, "") assert.NotNil(t, err) - err = mt.DeleteCollection(collID, "") + _, err = mt.DeleteCollection(collID, "") assert.Nil(t, err) // check DD operation flag - flag, err := mt.client.Load(DDMsgSendPrefix) + flag, err := mt.client.Load(DDMsgSendPrefix, 0) assert.Nil(t, err) assert.Equal(t, "false", flag) }) @@ -471,42 +486,42 @@ func TestMetaTable(t *testing.T) { mt.client = mockKV t.Run("add collection failed", func(t *testing.T) { - mockKV.loadWithPrefix = func(key string) ([]string, []string, error) { + mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) error { - return fmt.Errorf("multi save error") + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, fmt.Errorf("multi save error") } collInfo.PartitionIDs = nil - err := mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err := mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.NotNil(t, err) assert.EqualError(t, err, "multi save error") }) t.Run("delete collection failed", func(t *testing.T) { - mockKV.multiSave = func(kvs map[string]string) error { - return nil + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, nil } - mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string) error { - return fmt.Errorf("milti save and remove with prefix error") + mockKV.multiSaveAndRemoveWithPrefix = func(save map[string]string, keys []string) (typeutil.Timestamp, error) { + return 0, fmt.Errorf("milti save and remove with prefix error") } collInfo.PartitionIDs = nil - err := mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err := mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) mt.partitionID2Meta = make(map[typeutil.UniqueID]pb.PartitionInfo) mt.indexID2Meta = make(map[int64]pb.IndexInfo) - err = mt.DeleteCollection(collInfo.ID, "") + _, err = mt.DeleteCollection(collInfo.ID, "") assert.NotNil(t, err) assert.EqualError(t, err, "milti save and remove with prefix error") }) t.Run("get collection failed", func(t *testing.T) { - mockKV.save = func(key, value string) error { - return nil + mockKV.save = func(key, value string) (typeutil.Timestamp, error) { + return 0, nil } collInfo.PartitionIDs = nil - err := mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err := mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) seg := &datapb.SegmentInfo{ @@ -514,10 +529,11 @@ func TestMetaTable(t *testing.T) { CollectionID: collID, PartitionID: partID, } - assert.Nil(t, mt.AddSegment(seg)) + _, err = mt.AddSegment(seg) + assert.Nil(t, err) mt.collID2Meta = make(map[int64]pb.CollectionInfo) - _, err = mt.GetCollectionByName(collInfo.Schema.Name) + _, err = mt.GetCollectionByName(collInfo.Schema.Name, 0) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("can't find collection: %s", collInfo.Schema.Name)) @@ -532,130 +548,130 @@ func TestMetaTable(t *testing.T) { }) t.Run("add partition failed", func(t *testing.T) { - mockKV.save = func(key, value string) error { - return nil + mockKV.save = func(key, value string) (typeutil.Timestamp, error) { + return 0, nil } - mockKV.loadWithPrefix = func(key string) ([]string, []string, error) { + mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } err := mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) - err = mt.AddPartition(2, "no-part", 22, "") + _, err = mt.AddPartition(2, "no-part", 22, "") assert.NotNil(t, err) assert.EqualError(t, err, "can't find collection. id = 2") coll := mt.collID2Meta[collInfo.ID] coll.PartitionIDs = make([]int64, Params.MaxPartitionNum) mt.collID2Meta[coll.ID] = coll - err = mt.AddPartition(coll.ID, "no-part", 22, "") + _, err = mt.AddPartition(coll.ID, "no-part", 22, "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("maximum partition's number should be limit to %d", Params.MaxPartitionNum)) coll.PartitionIDs = []int64{partInfo.PartitionID} mt.collID2Meta[coll.ID] = coll mt.partitionID2Meta = make(map[int64]pb.PartitionInfo) - mockKV.multiSave = func(kvs map[string]string) error { - return fmt.Errorf("multi save error") + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, fmt.Errorf("multi save error") } - err = mt.AddPartition(coll.ID, "no-part", 22, "") + _, err = mt.AddPartition(coll.ID, "no-part", 22, "") assert.NotNil(t, err) assert.EqualError(t, err, "multi save error") - mockKV.multiSave = func(kvs map[string]string) error { - return nil + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, nil } collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) - err = mt.AddPartition(coll.ID, partInfo.PartitionName, 22, "") + _, err = mt.AddPartition(coll.ID, partInfo.PartitionName, 22, "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("partition name = %s already exists", partInfo.PartitionName)) - err = mt.AddPartition(coll.ID, "no-part", partInfo.PartitionID, "") + _, err = mt.AddPartition(coll.ID, "no-part", partInfo.PartitionID, "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("partition id = %d already exists", partInfo.PartitionID)) }) t.Run("has partition failed", func(t *testing.T) { - mockKV.loadWithPrefix = func(key string) ([]string, []string, error) { + mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) error { - return nil + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, nil } err := mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) mt.partitionID2Meta = make(map[int64]pb.PartitionInfo) - assert.False(t, mt.HasPartition(collInfo.ID, partInfo.PartitionName)) + assert.False(t, mt.HasPartition(collInfo.ID, partInfo.PartitionName, 0)) mt.collID2Meta = make(map[int64]pb.CollectionInfo) - assert.False(t, mt.HasPartition(collInfo.ID, partInfo.PartitionName)) + assert.False(t, mt.HasPartition(collInfo.ID, partInfo.PartitionName, 0)) }) t.Run("delete partition failed", func(t *testing.T) { - mockKV.loadWithPrefix = func(key string) ([]string, []string, error) { + mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) error { - return nil + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, nil } err := mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) - _, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, "") + _, _, err = mt.DeletePartition(collInfo.ID, Params.DefaultPartitionName, "") assert.NotNil(t, err) assert.EqualError(t, err, "default partition cannot be deleted") - _, err = mt.DeletePartition(collInfo.ID, "abc", "") + _, _, err = mt.DeletePartition(collInfo.ID, "abc", "") assert.NotNil(t, err) assert.EqualError(t, err, "partition abc does not exist") pm := mt.partitionID2Meta[partInfo.PartitionID] pm.SegmentIDs = []int64{11, 12, 13} mt.partitionID2Meta[pm.PartitionID] = pm - mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) error { - return fmt.Errorf("multi save and remove with prefix error") + mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) (typeutil.Timestamp, error) { + return 0, fmt.Errorf("multi save and remove with prefix error") } - _, err = mt.DeletePartition(collInfo.ID, pm.PartitionName, "") + _, _, err = mt.DeletePartition(collInfo.ID, pm.PartitionName, "") assert.NotNil(t, err) assert.EqualError(t, err, "multi save and remove with prefix error") mt.collID2Meta = make(map[int64]pb.CollectionInfo) - _, err = mt.DeletePartition(collInfo.ID, "abc", "") + _, _, err = mt.DeletePartition(collInfo.ID, "abc", "") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("can't find collection id = %d", collInfo.ID)) - _, err = mt.GetPartitionByID(11) + _, err = mt.GetPartitionByID(1, 11, 0) assert.NotNil(t, err) assert.EqualError(t, err, "partition id = 11 not exist") }) t.Run("add segment failed", func(t *testing.T) { - mockKV.loadWithPrefix = func(key string) ([]string, []string, error) { + mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) error { - return nil + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, nil } err := mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) noPart := pb.PartitionInfo{ @@ -670,7 +686,7 @@ func TestMetaTable(t *testing.T) { CollectionID: collInfo.ID, PartitionID: noPart.PartitionID, } - err = mt.AddSegment(seg) + _, err = mt.AddSegment(seg) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("partition id = %d, not belong to collection id = %d", seg.PartitionID, seg.CollectionID)) @@ -679,29 +695,29 @@ func TestMetaTable(t *testing.T) { CollectionID: collInfo.ID, PartitionID: partInfo.PartitionID, } - mockKV.save = func(key, value string) error { - return fmt.Errorf("save error") + mockKV.save = func(key, value string) (typeutil.Timestamp, error) { + return 0, fmt.Errorf("save error") } - err = mt.AddSegment(seg) + _, err = mt.AddSegment(seg) assert.NotNil(t, err) assert.EqualError(t, err, "save error") }) t.Run("add index failed", func(t *testing.T) { - mockKV.loadWithPrefix = func(key string) ([]string, []string, error) { + mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) error { - return nil + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, nil } - mockKV.save = func(key, value string) error { - return nil + mockKV.save = func(key, value string) (typeutil.Timestamp, error) { + return 0, nil } err := mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) seg := &datapb.SegmentInfo{ @@ -709,7 +725,8 @@ func TestMetaTable(t *testing.T) { CollectionID: collID, PartitionID: partID, } - assert.Nil(t, mt.AddSegment(seg)) + _, err = mt.AddSegment(seg) + assert.Nil(t, err) segIdxInfo := &pb.SegmentIndexInfo{ SegmentID: segID, @@ -717,22 +734,22 @@ func TestMetaTable(t *testing.T) { IndexID: indexID2, BuildID: buildID, } - err = mt.AddIndex(segIdxInfo) + _, err = mt.AddIndex(segIdxInfo) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", segIdxInfo.IndexID)) mt.segID2PartitionID = make(map[int64]int64) - err = mt.AddIndex(segIdxInfo) + _, err = mt.AddIndex(segIdxInfo) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any partition", segIdxInfo.SegmentID)) mt.collID2Meta = make(map[int64]pb.CollectionInfo) - err = mt.AddIndex(segIdxInfo) + _, err = mt.AddIndex(segIdxInfo) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("collection id = %d not found", collInfo.ID)) mt.segID2CollID = make(map[int64]int64) - err = mt.AddIndex(segIdxInfo) + _, err = mt.AddIndex(segIdxInfo) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any collection", segIdxInfo.SegmentID)) @@ -740,46 +757,47 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + assert.Nil(t, err) + _, err = mt.AddSegment(seg) assert.Nil(t, err) - assert.Nil(t, mt.AddSegment(seg)) segIdxInfo.IndexID = indexID - mockKV.save = func(key, value string) error { - return fmt.Errorf("save error") + mockKV.save = func(key, value string) (typeutil.Timestamp, error) { + return 0, fmt.Errorf("save error") } - err = mt.AddIndex(segIdxInfo) + _, err = mt.AddIndex(segIdxInfo) assert.NotNil(t, err) assert.EqualError(t, err, "save error") }) t.Run("drop index failed", func(t *testing.T) { - mockKV.loadWithPrefix = func(key string) ([]string, []string, error) { + mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) error { - return nil + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, nil } - mockKV.save = func(key, value string) error { - return nil + mockKV.save = func(key, value string) (typeutil.Timestamp, error) { + return 0, nil } err := mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) - _, _, err = mt.DropIndex("abc", "abc", "abc") + _, _, _, err = mt.DropIndex("abc", "abc", "abc") assert.NotNil(t, err) assert.EqualError(t, err, "collection name = abc not exist") mt.collName2ID["abc"] = 2 - _, _, err = mt.DropIndex("abc", "abc", "abc") + _, _, _, err = mt.DropIndex("abc", "abc", "abc") assert.NotNil(t, err) assert.EqualError(t, err, "collection name = abc not has meta") - _, _, err = mt.DropIndex(collInfo.Schema.Name, "abc", "abc") + _, _, _, err = mt.DropIndex(collInfo.Schema.Name, "abc", "abc") assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("collection %s doesn't have filed abc", collInfo.Schema.Name)) @@ -796,7 +814,7 @@ func TestMetaTable(t *testing.T) { } mt.collID2Meta[coll.ID] = coll mt.indexID2Meta = make(map[int64]pb.IndexInfo) - idxID, isDroped, err := mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName) + _, idxID, isDroped, err := mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName) assert.Zero(t, idxID) assert.False(t, isDroped) assert.Nil(t, err) @@ -804,32 +822,32 @@ func TestMetaTable(t *testing.T) { err = mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) mt.partitionID2Meta = make(map[int64]pb.PartitionInfo) - mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) error { - return fmt.Errorf("multi save and remove with prefix error") + mockKV.multiSaveAndRemoveWithPrefix = func(saves map[string]string, removals []string) (typeutil.Timestamp, error) { + return 0, fmt.Errorf("multi save and remove with prefix error") } - _, _, err = mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName) + _, _, _, err = mt.DropIndex(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0].IndexName) assert.NotNil(t, err) assert.EqualError(t, err, "multi save and remove with prefix error") }) t.Run("get segment index info by id", func(t *testing.T) { - mockKV.loadWithPrefix = func(key string) ([]string, []string, error) { + mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) error { - return nil + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, nil } - mockKV.save = func(key, value string) error { - return nil + mockKV.save = func(key, value string) (typeutil.Timestamp, error) { + return 0, nil } err := mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) _, err = mt.GetSegmentIndexInfoByID(segID2, fieldID, "abc") @@ -849,14 +867,16 @@ func TestMetaTable(t *testing.T) { CollectionID: collID, PartitionID: partID, } - assert.Nil(t, mt.AddSegment(segInfo)) + _, err = mt.AddSegment(segInfo) + assert.Nil(t, err) segIdx := &pb.SegmentIndexInfo{ SegmentID: segID, FieldID: fieldID, IndexID: indexID, BuildID: buildID, } - assert.Nil(t, mt.AddIndex(segIdx)) + _, err = mt.AddIndex(segIdx) + assert.Nil(t, err) idx, err := mt.GetSegmentIndexInfoByID(segIdx.SegmentID, segIdx.FieldID, idxInfo[0].IndexName) assert.Nil(t, err) assert.Equal(t, segIdx.IndexID, idx.IndexID) @@ -871,20 +891,20 @@ func TestMetaTable(t *testing.T) { }) t.Run("get field schema failed", func(t *testing.T) { - mockKV.loadWithPrefix = func(key string) ([]string, []string, error) { + mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } - mockKV.multiSave = func(kvs map[string]string) error { - return nil + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, nil } - mockKV.save = func(key, value string) error { - return nil + mockKV.save = func(key, value string) (typeutil.Timestamp, error) { + return 0, nil } err := mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) mt.collID2Meta = make(map[int64]pb.CollectionInfo) @@ -899,7 +919,7 @@ func TestMetaTable(t *testing.T) { }) t.Run("is segment indexed", func(t *testing.T) { - mockKV.loadWithPrefix = func(key string) ([]string, []string, error) { + mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } err := mt.reloadFromKV() @@ -923,7 +943,7 @@ func TestMetaTable(t *testing.T) { }) t.Run("get not indexed segments", func(t *testing.T) { - mockKV.loadWithPrefix = func(key string) ([]string, []string, error) { + mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } err := mt.reloadFromKV() @@ -945,17 +965,17 @@ func TestMetaTable(t *testing.T) { assert.NotNil(t, err) assert.EqualError(t, err, "collection abc not found") - mockKV.multiSave = func(kvs map[string]string) error { - return nil + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, nil } - mockKV.save = func(key, value string) error { - return nil + mockKV.save = func(key, value string) (typeutil.Timestamp, error) { + return 0, nil } err = mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) _, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, "no-field", idx) @@ -969,18 +989,18 @@ func TestMetaTable(t *testing.T) { assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", idxInfo[0].IndexID)) mt.indexID2Meta = bakMeta - mockKV.multiSave = func(kvs map[string]string) error { - return fmt.Errorf("multi save error") + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, fmt.Errorf("multi save error") } _, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx) assert.NotNil(t, err) assert.EqualError(t, err, "multi save error") - mockKV.multiSave = func(kvs map[string]string) error { - return nil + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, nil } collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) coll := mt.collID2Meta[collInfo.ID] coll.FieldIndexes = append(coll.FieldIndexes, &pb.FieldIndexInfo{FiledID: coll.FieldIndexes[0].FiledID, IndexID: coll.FieldIndexes[0].IndexID + 1}) @@ -998,8 +1018,8 @@ func TestMetaTable(t *testing.T) { mt.indexID2Meta[anotherIdx.IndexID] = anotherIdx idx.IndexName = idxInfo[0].IndexName - mockKV.multiSave = func(kvs map[string]string) error { - return fmt.Errorf("multi save error") + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, fmt.Errorf("multi save error") } _, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx) assert.NotNil(t, err) @@ -1007,7 +1027,7 @@ func TestMetaTable(t *testing.T) { }) t.Run("get index by name failed", func(t *testing.T) { - mockKV.loadWithPrefix = func(key string) ([]string, []string, error) { + mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } err := mt.reloadFromKV() @@ -1018,17 +1038,17 @@ func TestMetaTable(t *testing.T) { assert.NotNil(t, err) assert.EqualError(t, err, "collection abc not found") - mockKV.multiSave = func(kvs map[string]string) error { - return nil + mockKV.multiSave = func(kvs map[string]string) (typeutil.Timestamp, error) { + return 0, nil } - mockKV.save = func(key, value string) error { - return nil + mockKV.save = func(key, value string) (typeutil.Timestamp, error) { + return 0, nil } err = mt.reloadFromKV() assert.Nil(t, err) collInfo.PartitionIDs = nil - err = mt.AddCollection(collInfo, partInfo, idxInfo, "") + _, err = mt.AddCollection(collInfo, partInfo, idxInfo, "") assert.Nil(t, err) mt.indexID2Meta = make(map[int64]pb.IndexInfo) _, _, err = mt.GetIndexByName(collInfo.Schema.Name, idxInfo[0].IndexName) @@ -1041,7 +1061,7 @@ func TestMetaTable(t *testing.T) { }) t.Run("add flused segment failed", func(t *testing.T) { - mockKV.loadWithPrefix = func(key string) ([]string, []string, error) { + mockKV.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { return nil, nil, nil } err := mt.reloadFromKV() diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index 1b584d4525cd5e34dc2c7a3dc0faf086b4e020dc..93cb2a571f4c2241d0fe8349cb3fa7a8badb083b 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -222,7 +222,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { return err } - err = t.core.MetaTable.AddCollection(&collInfo, &partInfo, idxInfo, ddOpStr) + _, err = t.core.MetaTable.AddCollection(&collInfo, &partInfo, idxInfo, ddOpStr) if err != nil { return err } @@ -266,7 +266,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { return fmt.Errorf("drop collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) } - collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) + collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0) if err != nil { return err } @@ -286,7 +286,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { return err } - err = t.core.MetaTable.DeleteCollection(collMeta.ID, ddOpStr) + _, err = t.core.MetaTable.DeleteCollection(collMeta.ID, ddOpStr) if err != nil { return err } @@ -336,7 +336,7 @@ func (t *HasCollectionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_HasCollection { return fmt.Errorf("has collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) } - _, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) + _, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0) if err == nil { t.HasCollection = true } else { @@ -375,12 +375,12 @@ func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error { var err error if t.Req.CollectionName != "" { - collInfo, err = t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) + collInfo, err = t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0) if err != nil { return err } } else { - collInfo, err = t.core.MetaTable.GetCollectionByID(t.Req.CollectionID) + collInfo, err = t.core.MetaTable.GetCollectionByID(t.Req.CollectionID, 0) if err != nil { return err } @@ -427,7 +427,7 @@ func (t *ShowCollectionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_ShowCollections { return fmt.Errorf("show collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) } - coll, err := t.core.MetaTable.ListCollections() + coll, err := t.core.MetaTable.ListCollections(0) if err != nil { return err } @@ -460,7 +460,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_CreatePartition { return fmt.Errorf("create partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) } - collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) + collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0) if err != nil { return err } @@ -486,7 +486,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { return err } - err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ddOpStr) + _, err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ddOpStr) if err != nil { return err } @@ -528,11 +528,11 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DropPartition { return fmt.Errorf("drop partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) } - collInfo, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) + collInfo, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0) if err != nil { return err } - partInfo, err := t.core.MetaTable.GetPartitionByName(collInfo.ID, t.Req.PartitionName) + partInfo, err := t.core.MetaTable.GetPartitionByName(collInfo.ID, t.Req.PartitionName, 0) if err != nil { return err } @@ -554,7 +554,7 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { return err } - _, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ddOpStr) + _, _, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ddOpStr) if err != nil { return err } @@ -597,11 +597,11 @@ func (t *HasPartitionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_HasPartition { return fmt.Errorf("has partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) } - coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) + coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0) if err != nil { return err } - t.HasPartition = t.core.MetaTable.HasPartition(coll.ID, t.Req.PartitionName) + t.HasPartition = t.core.MetaTable.HasPartition(coll.ID, t.Req.PartitionName, 0) return nil } @@ -634,15 +634,15 @@ func (t *ShowPartitionReqTask) Execute(ctx context.Context) error { var coll *etcdpb.CollectionInfo var err error if t.Req.CollectionName == "" { - coll, err = t.core.MetaTable.GetCollectionByID(t.Req.CollectionID) + coll, err = t.core.MetaTable.GetCollectionByID(t.Req.CollectionID, 0) } else { - coll, err = t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) + coll, err = t.core.MetaTable.GetCollectionByName(t.Req.CollectionName, 0) } if err != nil { return err } for _, partID := range coll.PartitionIDs { - partMeta, err := t.core.MetaTable.GetPartitionByID(partID) + partMeta, err := t.core.MetaTable.GetPartitionByID(coll.ID, partID, 0) if err != nil { return err } @@ -678,7 +678,7 @@ func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DescribeSegment { return fmt.Errorf("describe segment, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) } - coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID) + coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID, 0) if err != nil { return err } @@ -687,7 +687,7 @@ func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error { if exist { break } - partMeta, err := t.core.MetaTable.GetPartitionByID(partID) + partMeta, err := t.core.MetaTable.GetPartitionByID(coll.ID, partID, 0) if err != nil { return err } @@ -738,7 +738,7 @@ func (t *ShowSegmentReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_ShowSegments { return fmt.Errorf("show segments, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) } - coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID) + coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID, 0) if err != nil { return err } @@ -752,7 +752,7 @@ func (t *ShowSegmentReqTask) Execute(ctx context.Context) error { if !exist { return fmt.Errorf("partition id = %d not belong to collection id = %d", t.Req.PartitionID, t.Req.CollectionID) } - partMeta, err := t.core.MetaTable.GetPartitionByID(t.Req.PartitionID) + partMeta, err := t.core.MetaTable.GetPartitionByID(coll.ID, t.Req.PartitionID, 0) if err != nil { return err } @@ -900,6 +900,6 @@ func (t *DropIndexReqTask) Execute(ctx context.Context) error { if err != nil { return err } - _, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName) + _, _, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName) return err }