未验证 提交 d1d5f9e4 编写于 作者: Z zhenshan.cao 提交者: GitHub

Replace proto.MarshalTextString with proto.Marshal (#8542)

Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 db944cd0
......@@ -81,7 +81,7 @@ verifiers: build-cpp getdeps cppcheck fmt static-check ruleguard
# Build various components locally.
binlog:
@echo "Building binlog ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/binlog/main.go 1>/dev/null
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/tools/binlog/main.go 1>/dev/null
BUILD_TAGS = $(shell git describe --tags --always --dirty="-dev")
BUILD_TIME = $(shell date --utc)
......
package main
import (
"flag"
"fmt"
"os"
"strings"
"github.com/milvus-io/milvus/internal/datacoord"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
type etcdEndPoints []string
func (i *etcdEndPoints) String() string {
return strings.Join(*i, ",")
}
func (i *etcdEndPoints) Set(value string) error {
*i = append(*i, value)
return nil
}
func main() {
var etcdEndPoints etcdEndPoints
var metaRootPath, segmentBinlogSubPath string
flagSet := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
flagSet.Var(&etcdEndPoints, "etcdEndPoints", "endpoints of etcd")
flagSet.StringVar(&metaRootPath, "metaRootPath", "", "root path of meta on etcd")
flagSet.StringVar(&segmentBinlogSubPath, "segmentBinlogSubPath", "", "binlog path prefix on etcd")
flagSet.Usage = func() {
fmt.Fprintf(flagSet.Output(), "All flags is optional. If you did not change it in config files, you do not need to set the flag.\n")
flagSet.PrintDefaults()
}
if len(os.Args) > 0 {
flagSet.Parse(os.Args[1:])
}
datacoord.Params.Init()
if len(etcdEndPoints) != 0 {
datacoord.Params.EtcdEndpoints = etcdEndPoints
}
if len(metaRootPath) != 0 {
datacoord.Params.MetaRootPath = metaRootPath
}
if len(segmentBinlogSubPath) != 0 {
datacoord.Params.SegmentBinlogSubPath = segmentBinlogSubPath
}
etcdKV, err := etcdkv.NewEtcdKV(datacoord.Params.EtcdEndpoints, datacoord.Params.MetaRootPath)
if err != nil {
log.Error("failed to connect to etcd", zap.Error(err))
return
}
meta, err := datacoord.NewMeta(etcdKV)
if err != nil {
log.Error("failed to create meta", zap.Error(err))
return
}
helper := datacoord.NewMoveBinlogPathHelper(etcdKV, meta)
if err = helper.Execute(); err != nil {
return
}
log.Info("finished")
}
......@@ -165,7 +165,7 @@ func (c *Cluster) loadFromKV() error {
for _, v := range values {
info := &datapb.DataNodeInfo{}
if err := proto.UnmarshalText(v, info); err != nil {
if err := proto.Unmarshal([]byte(v), info); err != nil {
return err
}
......@@ -177,7 +177,7 @@ func (c *Cluster) loadFromKV() error {
//TODO add not value error check
if dn != "" {
info := &datapb.DataNodeInfo{}
if err := proto.UnmarshalText(dn, info); err != nil {
if err := proto.Unmarshal([]byte(dn), info); err != nil {
return err
}
c.chanBuffer = info.Channels
......@@ -288,7 +288,7 @@ func (c *Cluster) handleEvent(node *NodeInfo) {
c.nodes.SetWatched(node.Info.GetVersion(), parseChannelsFromReq(req))
c.mu.Unlock()
if err = c.saveNode(node); err != nil {
log.Warn("failed to save node info", zap.Any("node", node))
log.Warn("failed to save node info", zap.Any("node", node), zap.Error(err))
continue
}
case Flush:
......@@ -416,7 +416,10 @@ func (c *Cluster) handleRegister(n *NodeInfo) {
zap.Any("nodes", nodes),
zap.Any("buffer", c.chanBuffer))
go c.handleEvent(n)
c.txnSaveNodesAndBuffer(nodes, c.chanBuffer)
err := c.txnSaveNodesAndBuffer(nodes, c.chanBuffer)
if err != nil {
log.Warn("DataCoord Cluster handleRegister txnSaveNodesAndBuffer", zap.Error(err))
}
for _, node := range nodes {
c.nodes.SetNode(node.Info.GetVersion(), node)
}
......@@ -453,7 +456,10 @@ func (c *Cluster) handleUnRegister(n *NodeInfo) {
rets = c.unregisterPolicy(cNodes, node)
}
log.Debug("delta changes after unregister policy", zap.Any("nodes", rets), zap.Any("buffer", c.chanBuffer))
c.txnSaveNodesAndBuffer(rets, c.chanBuffer)
err := c.txnSaveNodesAndBuffer(rets, c.chanBuffer)
if err != nil {
log.Warn("DataCoord Cluster handleUnRegister txnSaveNodesAndBuffer", zap.Error(err))
}
for _, node := range rets {
c.nodes.SetNode(node.Info.GetVersion(), node)
}
......@@ -478,7 +484,10 @@ func (c *Cluster) handleWatchChannel(channel string, collectionID UniqueID) {
} else {
rets = c.assignPolicy(cNodes, channel, collectionID)
}
c.txnSaveNodesAndBuffer(rets, c.chanBuffer)
err := c.txnSaveNodesAndBuffer(rets, c.chanBuffer)
if err != nil {
log.Warn("DataCoord Cluster handleWatchChannel txnSaveNodesAndBuffer", zap.Error(err))
}
for _, node := range rets {
c.nodes.SetNode(node.Info.GetVersion(), node)
}
......@@ -583,8 +592,11 @@ func (c *Cluster) watch(n *NodeInfo) {
func (c *Cluster) saveNode(n *NodeInfo) error {
key := fmt.Sprintf("%s%d", clusterPrefix, n.Info.GetVersion())
value := proto.MarshalTextString(n.Info)
return c.kv.Save(key, value)
value, err := proto.Marshal(n.Info)
if err != nil {
return err
}
return c.kv.Save(key, string(value))
}
func (c *Cluster) txnSaveNodesAndBuffer(nodes []*NodeInfo, buffer []*datapb.ChannelStatus) error {
......@@ -594,16 +606,22 @@ func (c *Cluster) txnSaveNodesAndBuffer(nodes []*NodeInfo, buffer []*datapb.Chan
data := make(map[string]string)
for _, n := range nodes {
key := fmt.Sprintf("%s%d", clusterPrefix, n.Info.GetVersion())
value := proto.MarshalTextString(n.Info)
data[key] = value
value, err := proto.Marshal(n.Info)
if err != nil {
return fmt.Errorf("marshal failed key:%s, err:%w", key, err)
}
data[key] = string(value)
}
// short cut, reusing datainfo to store array of channel status
bufNode := &datapb.DataNodeInfo{
Channels: buffer,
}
data[clusterBuffer] = proto.MarshalTextString(bufNode)
buffData, err := proto.Marshal(bufNode)
if err != nil {
return fmt.Errorf("marshal bufNode failed:%w", err)
}
data[clusterBuffer] = string(buffData)
return c.kv.MultiSave(data)
}
......
......@@ -15,6 +15,9 @@ import (
"sync"
"time"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
......@@ -57,10 +60,9 @@ func (m *meta) reloadFromKV() error {
for _, value := range values {
segmentInfo := &datapb.SegmentInfo{}
// TODO deprecate all proto text marshal/unmarsahl
err = proto.UnmarshalText(value, segmentInfo)
err = proto.Unmarshal([]byte(value), segmentInfo)
if err != nil {
return fmt.Errorf("DataCoord reloadFromKV UnMarshalText datapb.SegmentInfo err:%w", err)
return fmt.Errorf("DataCoord reloadFromKV UnMarshal datapb.SegmentInfo err:%w", err)
}
m.segments.SetSegment(segmentInfo.GetID(), NewSegmentInfo(segmentInfo))
}
......@@ -208,9 +210,13 @@ func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool,
for id := range modSegments {
if segment := m.segments.GetSegment(id); segment != nil {
segBytes := proto.MarshalTextString(segment.SegmentInfo)
segBytes, err := proto.Marshal(segment.SegmentInfo)
if err != nil {
log.Error("DataCoord UpdateFlushSegmentsInfo marshal failed", zap.Int64("segmentID", segment.GetID()), zap.Error(err))
return fmt.Errorf("DataCoord UpdateFlushSegmentsInfo segmentID:%d, marshal failed:%w", segment.GetID(), err)
}
key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
kv[key] = segBytes
kv[key] = string(segBytes)
}
}
......@@ -366,17 +372,25 @@ func (m *meta) MoveSegmentBinlogs(segmentID UniqueID, oldPathPrefix string, fiel
if segment := m.segments.GetSegment(segmentID); segment != nil {
k := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
kv[k] = proto.MarshalTextString(segment.SegmentInfo)
v, err := proto.Marshal(segment.SegmentInfo)
if err != nil {
log.Error("DataCoord MoveSegmentBinlogs marshal failed", zap.Int64("segmentID", segment.GetID()), zap.Error(err))
return fmt.Errorf("DataCoord MoveSegmentBinlogs segmentID:%d, marshal failed:%w", segment.GetID(), err)
}
kv[k] = string(v)
}
return m.client.MultiSaveAndRemoveWithPrefix(kv, removals)
}
// saveSegmentInfo utility function saving segment info into kv store
func (m *meta) saveSegmentInfo(segment *SegmentInfo) error {
segBytes := proto.MarshalTextString(segment.SegmentInfo)
segBytes, err := proto.Marshal(segment.SegmentInfo)
if err != nil {
log.Error("DataCoord saveSegmentInfo marshal failed", zap.Int64("segmentID", segment.GetID()), zap.Error(err))
return fmt.Errorf("DataCoord saveSegmentInfo segmentID:%d, marshal failed:%w", segment.GetID(), err)
}
key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
return m.client.Save(key, segBytes)
return m.client.Save(key, string(segBytes))
}
// removeSegmentInfo utility function removing segment info from kv store
......
package datacoord
import (
"fmt"
"path"
"strconv"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"go.uber.org/zap"
)
type MoveBinlogPathHelper struct {
kv kv.TxnKV
meta *meta
}
func NewMoveBinlogPathHelper(kv kv.TxnKV, meta *meta) *MoveBinlogPathHelper {
return &MoveBinlogPathHelper{
kv: kv,
meta: meta,
}
}
func (h *MoveBinlogPathHelper) Execute() error {
segmentIds := h.meta.ListSegmentIDs()
if len(segmentIds) == 1 {
log.Debug("there's 1 segment's binlogs to move", zap.Int64("segmentID", segmentIds[0]))
} else {
log.Debug(fmt.Sprintf("there are %d segments' binlogs to move", len(segmentIds)))
}
for _, id := range segmentIds {
m := make(map[UniqueID][]string)
p := path.Join(Params.SegmentBinlogSubPath, strconv.FormatInt(id, 10)) + "/" // prefix/id/ instead of prefix/id
_, values, err := h.kv.LoadWithPrefix(p)
if err != nil {
log.Error("failed to load prefix", zap.String("prefix", p), zap.Error(err))
return err
}
for _, v := range values {
tMeta := &datapb.SegmentFieldBinlogMeta{}
if err := proto.UnmarshalText(v, tMeta); err != nil {
log.Error("failed to unmarshal", zap.Error(err))
return err
}
m[tMeta.FieldID] = append(m[tMeta.FieldID], tMeta.BinlogPath)
}
if err := h.meta.MoveSegmentBinlogs(id, p, m); err != nil {
log.Error("failed to save binlogs in meta", zap.Int64("segmentID", id), zap.Error(err))
return err
}
log.Debug(fmt.Sprintf("success to move binlogs of segment %d", id))
}
return nil
}
package datacoord
import (
"path"
"testing"
"github.com/gogo/protobuf/proto"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
)
func TestMoveBinlogPathHelper_Start(t *testing.T) {
Params.Init()
t.Run("test normal move", func(t *testing.T) {
var err error
kv := memkv.NewMemoryKV()
segment := &datapb.SegmentInfo{ID: 0}
err = kv.Save(buildSegmentPath(0, 0, 0), proto.MarshalTextString(segment))
assert.Nil(t, err)
binlogMeta := &datapb.SegmentFieldBinlogMeta{
FieldID: 0,
BinlogPath: "path1",
}
err = kv.Save(path.Join(Params.SegmentBinlogSubPath, "0", "0", "path1"), proto.MarshalTextString(binlogMeta))
assert.Nil(t, err)
meta, err := NewMeta(kv)
assert.Nil(t, err)
helper := NewMoveBinlogPathHelper(kv, meta)
err = helper.Execute()
assert.Nil(t, err)
pbstr, err := kv.Load(buildSegmentPath(0, 0, 0))
assert.Nil(t, err)
err = proto.UnmarshalText(pbstr, segment)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(segment.Binlogs))
assert.EqualValues(t, 0, segment.Binlogs[0].FieldID)
assert.EqualValues(t, []string{"path1"}, segment.Binlogs[0].Binlogs)
})
}
......@@ -231,15 +231,8 @@ func (s *Server) Start() error {
}
s.startServerLoop()
helper := NewMoveBinlogPathHelper(s.kvClient, s.meta)
if err := helper.Execute(); err != nil {
return err
}
Params.CreatedTime = time.Now()
Params.UpdatedTime = time.Now()
atomic.StoreInt64(&s.isServing, ServerStateHealthy)
log.Debug("dataCoordinator startup success")
......
......@@ -234,7 +234,11 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
return
}
watchInfo.State = datapb.ChannelWatchState_Complete
v, _ := proto.Marshal(&watchInfo)
v, err := proto.Marshal(&watchInfo)
if err != nil {
log.Warn("fail to Marshal watchInfo", zap.String("key", string(evt.Kv.Key)), zap.Error(err))
return
}
err = node.kvClient.Save(fmt.Sprintf("channel/%d/%s", node.NodeID, watchInfo.Vchan.ChannelName), string(v))
if err != nil {
log.Warn("fail to change WatchState to complete", zap.String("key", string(evt.Kv.Key)), zap.Error(err))
......
......@@ -704,7 +704,7 @@ func (i *IndexCoord) watchMetaLoop() {
for _, event := range resp.Events {
eventRevision := event.Kv.Version
indexMeta := &indexpb.IndexMeta{}
err := proto.UnmarshalText(string(event.Kv.Value), indexMeta)
err := proto.Unmarshal(event.Kv.Value, indexMeta)
indexBuildID := indexMeta.IndexBuildID
log.Debug("IndexCoord watchMetaLoop", zap.Any("event.Key", event.Kv.Key),
zap.Any("event.V", indexMeta), zap.Int64("IndexBuildID", indexBuildID), zap.Error(err))
......
......@@ -49,7 +49,7 @@ func TestIndexCoord(t *testing.T) {
assert.Nil(t, err)
ic.reqTimeoutInterval = time.Second * 10
ic.durationInterval = time.Second
ic.assignTaskInterval = time.Second
ic.assignTaskInterval = 200 * time.Millisecond
ic.taskLimit = 20
Params.Init()
err = ic.Register()
......@@ -181,6 +181,7 @@ func TestIndexCoord(t *testing.T) {
t.Run("Recycle IndexMeta", func(t *testing.T) {
indexMeta := ic.metaTable.GetIndexMetaByIndexBuildID(indexBuildID)
for indexMeta != nil {
log.Info("RecycleIndexMeta", zap.Any("meta", indexMeta))
indexMeta = ic.metaTable.GetIndexMetaByIndexBuildID(indexBuildID)
time.Sleep(100 * time.Millisecond)
}
......
......@@ -70,7 +70,7 @@ func (mt *metaTable) reloadFromKV() error {
for i := 0; i < len(values); i++ {
indexMeta := indexpb.IndexMeta{}
err = proto.UnmarshalText(values[i], &indexMeta)
err = proto.Unmarshal([]byte(values[i]), &indexMeta)
if err != nil {
return fmt.Errorf("IndexCoord metaTable reloadFromKV UnmarshalText indexpb.IndexMeta err:%w", err)
}
......@@ -86,10 +86,12 @@ func (mt *metaTable) reloadFromKV() error {
// metaTable.lock.Lock() before call this function
func (mt *metaTable) saveIndexMeta(meta *Meta) error {
value := proto.MarshalTextString(meta.indexMeta)
value, err := proto.Marshal(meta.indexMeta)
if err != nil {
return err
}
key := "indexes/" + strconv.FormatInt(meta.indexMeta.IndexBuildID, 10)
err := mt.client.CompareVersionAndSwap(key, meta.revision, value)
err = mt.client.CompareVersionAndSwap(key, meta.revision, string(value))
log.Debug("IndexCoord metaTable saveIndexMeta ", zap.String("key", key), zap.Error(err))
if err != nil {
return err
......@@ -115,7 +117,7 @@ func (mt *metaTable) reloadMeta(indexBuildID UniqueID) (*Meta, error) {
return nil, errors.New("meta doesn't exist in KV")
}
im := &indexpb.IndexMeta{}
err = proto.UnmarshalText(values[0], im)
err = proto.Unmarshal([]byte(values[0]), im)
if err != nil {
return nil, err
}
......
......@@ -44,9 +44,10 @@ func TestMetaTable(t *testing.T) {
Version: 10,
Recycled: false,
}
value := proto.MarshalTextString(indexMeta1)
value, err := proto.Marshal(indexMeta1)
assert.Nil(t, err)
key := "indexes/" + strconv.FormatInt(indexMeta1.IndexBuildID, 10)
err = etcdKV.Save(key, value)
err = etcdKV.Save(key, string(value))
assert.Nil(t, err)
metaTable, err := NewMetaTable(etcdKV)
assert.Nil(t, err)
......@@ -89,9 +90,10 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err)
indexMeta1.NodeID = 2
value = proto.MarshalTextString(indexMeta1)
value, err = proto.Marshal(indexMeta1)
assert.Nil(t, err)
key = "indexes/" + strconv.FormatInt(indexMeta1.IndexBuildID, 10)
err = etcdKV.Save(key, value)
err = etcdKV.Save(key, string(value))
assert.Nil(t, err)
err = metaTable.BuildIndex(indexMeta1.IndexBuildID, 1)
assert.Nil(t, err)
......@@ -102,9 +104,10 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err)
indexMeta1.Version = indexMeta1.Version + 1
value = proto.MarshalTextString(indexMeta1)
value, err = proto.Marshal(indexMeta1)
assert.Nil(t, err)
key = "indexes/" + strconv.FormatInt(indexMeta1.IndexBuildID, 10)
err = etcdKV.Save(key, value)
err = etcdKV.Save(key, string(value))
assert.Nil(t, err)
err = metaTable.UpdateVersion(indexMeta1.IndexBuildID)
assert.Nil(t, err)
......@@ -112,9 +115,10 @@ func TestMetaTable(t *testing.T) {
t.Run("MarkIndexAsDeleted", func(t *testing.T) {
indexMeta1.Version = indexMeta1.Version + 1
value = proto.MarshalTextString(indexMeta1)
value, err = proto.Marshal(indexMeta1)
assert.Nil(t, err)
key = "indexes/" + strconv.FormatInt(indexMeta1.IndexBuildID, 10)
err = etcdKV.Save(key, value)
err = etcdKV.Save(key, string(value))
assert.Nil(t, err)
err = metaTable.MarkIndexAsDeleted(indexMeta1.Req.IndexID)
assert.Nil(t, err)
......@@ -139,9 +143,10 @@ func TestMetaTable(t *testing.T) {
t.Run("UpdateRecycleState", func(t *testing.T) {
indexMeta1.Version = indexMeta1.Version + 1
value = proto.MarshalTextString(indexMeta1)
value, err = proto.Marshal(indexMeta1)
assert.Nil(t, err)
key = "indexes/" + strconv.FormatInt(indexMeta1.IndexBuildID, 10)
err = etcdKV.Save(key, value)
err = etcdKV.Save(key, string(value))
assert.Nil(t, err)
err = metaTable.UpdateRecycleState(indexMeta1.IndexBuildID)
......
......@@ -70,14 +70,18 @@ func (inm *Mock) buildIndexTask() {
if err != nil {
return err
}
err = proto.UnmarshalText(values[0], &indexMeta)
err = proto.Unmarshal([]byte(values[0]), &indexMeta)
if err != nil {
return err
}
indexMeta.IndexFilePaths = []string{"IndexFilePath-1", "IndexFilePath-2"}
indexMeta.State = commonpb.IndexState_Failed
metaData, err := proto.Marshal(&indexMeta)
if err != nil {
return err
}
err = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0],
proto.MarshalTextString(&indexMeta))
string(metaData))
if err != nil {
return err
}
......@@ -94,14 +98,18 @@ func (inm *Mock) buildIndexTask() {
if err != nil {
return err
}
err = proto.UnmarshalText(values[0], &indexMeta)
err = proto.Unmarshal([]byte(values[0]), &indexMeta)
if err != nil {
return err
}
indexMeta.IndexFilePaths = []string{"IndexFilePath-1", "IndexFilePath-2"}
indexMeta.State = commonpb.IndexState_Failed
metaData, err := proto.Marshal(&indexMeta)
if err != nil {
return err
}
err = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0],
proto.MarshalTextString(&indexMeta))
string(metaData))
if err != nil {
return err
}
......@@ -111,15 +119,19 @@ func (inm *Mock) buildIndexTask() {
if err != nil {
return err
}
err = proto.UnmarshalText(values2[0], &indexMeta2)
err = proto.Unmarshal([]byte(values2[0]), &indexMeta2)
if err != nil {
return err
}
indexMeta2.Version = indexMeta.Version + 1
indexMeta2.IndexFilePaths = []string{"IndexFilePath-1", "IndexFilePath-2"}
indexMeta2.State = commonpb.IndexState_Finished
metaData2, err := proto.Marshal(&indexMeta2)
if err != nil {
return err
}
err = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions2[0],
proto.MarshalTextString(&indexMeta2))
string(metaData2))
if err != nil {
return err
}
......
......@@ -161,8 +161,9 @@ func TestIndexNodeMockFiled(t *testing.T) {
Version: 0,
}
value := proto.MarshalTextString(indexMeta)
err := inm.etcdKV.Save(key, value)
value, err := proto.Marshal(indexMeta)
assert.Nil(t, err)
err = inm.etcdKV.Save(key, string(value))
assert.Nil(t, err)
resp, err := inm.CreateIndex(ctx, req)
assert.Nil(t, err)
......
......@@ -128,8 +128,9 @@ func TestIndexNode(t *testing.T) {
Version: 1,
}
value := proto.MarshalTextString(indexMeta)
err = in.etcdKV.Save(metaPath1, value)
value, err := proto.Marshal(indexMeta)
assert.Nil(t, err)
err = in.etcdKV.Save(metaPath1, string(value))
assert.Nil(t, err)
req := &indexpb.CreateIndexRequest{
IndexBuildID: indexBuildID1,
......@@ -160,20 +161,20 @@ func TestIndexNode(t *testing.T) {
},
}
status, err := in.CreateIndex(ctx, req)
assert.Nil(t, err)
status, err2 := in.CreateIndex(ctx, req)
assert.Nil(t, err2)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
value, err = in.etcdKV.Load(metaPath1)
assert.Nil(t, err)
strValue, err3 := in.etcdKV.Load(metaPath1)
assert.Nil(t, err3)
indexMetaTmp := indexpb.IndexMeta{}
err = proto.UnmarshalText(value, &indexMetaTmp)
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
for indexMetaTmp.State != commonpb.IndexState_Finished {
time.Sleep(100 * time.Millisecond)
value, err = in.etcdKV.Load(metaPath1)
strValue, err := in.etcdKV.Load(metaPath1)
assert.Nil(t, err)
err = proto.UnmarshalText(value, &indexMetaTmp)
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
}
defer in.kv.MultiRemove(indexMetaTmp.IndexFilePaths)
......@@ -242,8 +243,9 @@ func TestIndexNode(t *testing.T) {
Version: 1,
}
value := proto.MarshalTextString(indexMeta)
err = in.etcdKV.Save(metaPath2, value)
value, err := proto.Marshal(indexMeta)
assert.Nil(t, err)
err = in.etcdKV.Save(metaPath2, string(value))
assert.Nil(t, err)
req := &indexpb.CreateIndexRequest{
IndexBuildID: indexBuildID2,
......@@ -270,20 +272,20 @@ func TestIndexNode(t *testing.T) {
},
}
status, err := in.CreateIndex(ctx, req)
assert.Nil(t, err)
status, err2 := in.CreateIndex(ctx, req)
assert.Nil(t, err2)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
value, err = in.etcdKV.Load(metaPath2)
assert.Nil(t, err)
strValue, err3 := in.etcdKV.Load(metaPath2)
assert.Nil(t, err3)
indexMetaTmp := indexpb.IndexMeta{}
err = proto.UnmarshalText(value, &indexMetaTmp)
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
for indexMetaTmp.State != commonpb.IndexState_Finished {
time.Sleep(100 * time.Millisecond)
value, err = in.etcdKV.Load(metaPath2)
strValue, err = in.etcdKV.Load(metaPath2)
assert.Nil(t, err)
err = proto.UnmarshalText(value, &indexMetaTmp)
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
}
defer in.kv.MultiRemove(indexMetaTmp.IndexFilePaths)
......@@ -354,8 +356,9 @@ func TestIndexNode(t *testing.T) {
MarkDeleted: true,
}
value := proto.MarshalTextString(indexMeta)
err = in.etcdKV.Save(metaPath3, value)
value, err := proto.Marshal(indexMeta)
assert.Nil(t, err)
err = in.etcdKV.Save(metaPath3, string(value))
assert.Nil(t, err)
req := &indexpb.CreateIndexRequest{
IndexBuildID: indexBuildID1,
......@@ -386,20 +389,20 @@ func TestIndexNode(t *testing.T) {
},
}
status, err := in.CreateIndex(ctx, req)
assert.Nil(t, err)
status, err2 := in.CreateIndex(ctx, req)
assert.Nil(t, err2)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
value, err = in.etcdKV.Load(metaPath3)
assert.Nil(t, err)
strValue, err3 := in.etcdKV.Load(metaPath3)
assert.Nil(t, err3)
indexMetaTmp := indexpb.IndexMeta{}
err = proto.UnmarshalText(value, &indexMetaTmp)
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
for indexMetaTmp.State != commonpb.IndexState_Finished {
time.Sleep(time.Second)
value, err = in.etcdKV.Load(metaPath3)
time.Sleep(100 * time.Millisecond)
strValue, err := in.etcdKV.Load(metaPath3)
assert.Nil(t, err)
err = proto.UnmarshalText(value, &indexMetaTmp)
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
}
defer in.kv.MultiRemove(indexMetaTmp.IndexFilePaths)
......@@ -528,8 +531,9 @@ func TestCreateIndexFailed(t *testing.T) {
Version: 1,
}
value := proto.MarshalTextString(indexMeta)
err = in.etcdKV.Save(metaPath1, value)
value, err := proto.Marshal(indexMeta)
assert.Nil(t, err)
err = in.etcdKV.Save(metaPath1, string(value))
assert.Nil(t, err)
req := &indexpb.CreateIndexRequest{
IndexBuildID: indexBuildID1,
......@@ -564,20 +568,20 @@ func TestCreateIndexFailed(t *testing.T) {
},
}
status, err := in.CreateIndex(ctx, req)
assert.Nil(t, err)
status, err2 := in.CreateIndex(ctx, req)
assert.Nil(t, err2)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
value, err = in.etcdKV.Load(metaPath1)
assert.Nil(t, err)
strValue, err3 := in.etcdKV.Load(metaPath1)
assert.Nil(t, err3)
indexMetaTmp := indexpb.IndexMeta{}
err = proto.UnmarshalText(value, &indexMetaTmp)
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
for indexMetaTmp.State != commonpb.IndexState_Failed {
time.Sleep(100 * time.Millisecond)
value, err = in.etcdKV.Load(metaPath1)
strValue, err = in.etcdKV.Load(metaPath1)
assert.Nil(t, err)
err = proto.UnmarshalText(value, &indexMetaTmp)
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
}
defer in.kv.MultiRemove(indexMetaTmp.IndexFilePaths)
......@@ -645,8 +649,9 @@ func TestCreateIndexFailed(t *testing.T) {
Version: 1,
}
value2 := proto.MarshalTextString(indexMeta2)
err = in.etcdKV.Save(metaPath2, value2)
value2, err := proto.Marshal(indexMeta2)
assert.Nil(t, err)
err = in.etcdKV.Save(metaPath2, string(value2))
assert.Nil(t, err)
req2 := &indexpb.CreateIndexRequest{
......@@ -682,20 +687,20 @@ func TestCreateIndexFailed(t *testing.T) {
},
}
status, err := in.CreateIndex(ctx, req2)
assert.Nil(t, err)
status, err2 := in.CreateIndex(ctx, req2)
assert.Nil(t, err2)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
value, err := in.etcdKV.Load(metaPath2)
assert.Nil(t, err)
strValue, err3 := in.etcdKV.Load(metaPath2)
assert.Nil(t, err3)
indexMetaTmp := indexpb.IndexMeta{}
err = proto.UnmarshalText(value, &indexMetaTmp)
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
for indexMetaTmp.State != commonpb.IndexState_Failed {
time.Sleep(100 * time.Millisecond)
value, err = in.etcdKV.Load(metaPath2)
strValue, err = in.etcdKV.Load(metaPath2)
assert.Nil(t, err)
err = proto.UnmarshalText(value, &indexMetaTmp)
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
}
defer in.kv.MultiRemove(indexMetaTmp.IndexFilePaths)
......
......@@ -126,8 +126,11 @@ func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error {
zap.Error(err), zap.Any("pre", pre))
return err
}
if len(values) == 0 {
return fmt.Errorf("IndexNode checkIndexMeta the indexMeta is empty")
}
log.Debug("IndexNode checkIndexMeta load meta success", zap.Any("path", it.req.MetaPath), zap.Any("pre", pre))
err = proto.UnmarshalText(values[0], &indexMeta)
err = proto.Unmarshal([]byte(values[0]), &indexMeta)
if err != nil {
log.Error("IndexNode checkIndexMeta Unmarshal", zap.Error(err))
return err
......@@ -139,8 +142,11 @@ func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error {
}
if indexMeta.MarkDeleted {
indexMeta.State = commonpb.IndexState_Finished
v := proto.MarshalTextString(&indexMeta)
err := it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0], v)
v, err := proto.Marshal(&indexMeta)
if err != nil {
return err
}
err = it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0], string(v))
if err != nil {
return err
}
......@@ -159,8 +165,15 @@ func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error {
indexMeta.FailReason = it.err.Error()
}
log.Debug("IndexNode", zap.Int64("indexBuildID", indexMeta.IndexBuildID), zap.Any("IndexState", indexMeta.State))
var metaValue []byte
metaValue, err = proto.Marshal(&indexMeta)
if err != nil {
log.Debug("IndexNode", zap.Int64("indexBuildID", indexMeta.IndexBuildID), zap.Any("IndexState", indexMeta.State),
zap.Any("proto.Marshal failed:", err))
return err
}
err = it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0],
proto.MarshalTextString(&indexMeta))
string(metaValue))
log.Debug("IndexNode checkIndexMeta CompareVersionAndSwap", zap.Error(err))
return err
}
......@@ -380,7 +393,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
return err
}
indexMeta := indexpb.IndexMeta{}
err = proto.UnmarshalText(v, &indexMeta)
err = proto.Unmarshal([]byte(v), &indexMeta)
if err != nil {
log.Error("IndexNode Unmarshal indexMeta error ", zap.Error(err))
return err
......
......@@ -174,7 +174,7 @@ func (c *queryNodeCluster) reloadFromKV() error {
}
for _, value := range collectionValues {
collectionInfo := &querypb.CollectionInfo{}
err = proto.UnmarshalText(value, collectionInfo)
err = proto.Unmarshal([]byte(value), collectionInfo)
if err != nil {
return err
}
......
......@@ -90,9 +90,10 @@ func TestReloadClusterFromKV(t *testing.T) {
collectionInfo := &querypb.CollectionInfo{
CollectionID: defaultCollectionID,
}
collectionBlobs := proto.MarshalTextString(collectionInfo)
collectionBlobs, err := proto.Marshal(collectionInfo)
assert.Nil(t, err)
nodeKey := fmt.Sprintf("%s/%d", queryNodeMetaPrefix, 100)
kvs[nodeKey] = collectionBlobs
kvs[nodeKey] = string(collectionBlobs)
err = kv.MultiSave(kvs)
assert.Nil(t, err)
......
......@@ -113,7 +113,7 @@ func (m *MetaReplica) reloadFromKV() error {
return err
}
collectionInfo := &querypb.CollectionInfo{}
err = proto.UnmarshalText(collectionValues[index], collectionInfo)
err = proto.Unmarshal([]byte(collectionValues[index]), collectionInfo)
if err != nil {
return err
}
......@@ -130,7 +130,7 @@ func (m *MetaReplica) reloadFromKV() error {
return err
}
segmentInfo := &querypb.SegmentInfo{}
err = proto.UnmarshalText(segmentValues[index], segmentInfo)
err = proto.Unmarshal([]byte(segmentValues[index]), segmentInfo)
if err != nil {
return err
}
......@@ -147,7 +147,7 @@ func (m *MetaReplica) reloadFromKV() error {
return err
}
queryChannelInfo := &querypb.QueryChannelInfo{}
err = proto.UnmarshalText(queryChannelValues[index], queryChannelInfo)
err = proto.Unmarshal([]byte(queryChannelValues[index]), queryChannelInfo)
if err != nil {
return err
}
......@@ -702,10 +702,13 @@ func (m *MetaReplica) setLoadPercentage(collectionID UniqueID, partitionID Uniqu
//}
func saveGlobalCollectionInfo(collectionID UniqueID, info *querypb.CollectionInfo, kv kv.MetaKv) error {
infoBytes := proto.MarshalTextString(info)
infoBytes, err := proto.Marshal(info)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%d", collectionMetaPrefix, collectionID)
return kv.Save(key, infoBytes)
return kv.Save(key, string(infoBytes))
}
func removeGlobalCollectionInfo(collectionID UniqueID, kv kv.MetaKv) error {
......@@ -714,10 +717,13 @@ func removeGlobalCollectionInfo(collectionID UniqueID, kv kv.MetaKv) error {
}
func saveSegmentInfo(segmentID UniqueID, info *querypb.SegmentInfo, kv kv.MetaKv) error {
infoBytes := proto.MarshalTextString(info)
infoBytes, err := proto.Marshal(info)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%d", segmentMetaPrefix, segmentID)
return kv.Save(key, infoBytes)
return kv.Save(key, string(infoBytes))
}
func removeSegmentInfo(segmentID UniqueID, kv kv.MetaKv) error {
......@@ -726,13 +732,11 @@ func removeSegmentInfo(segmentID UniqueID, kv kv.MetaKv) error {
}
func saveQueryChannelInfo(collectionID UniqueID, info *querypb.QueryChannelInfo, kv kv.MetaKv) error {
infoBytes := proto.MarshalTextString(info)
infoBytes, err := proto.Marshal(info)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID)
return kv.Save(key, infoBytes)
return kv.Save(key, string(infoBytes))
}
//func removeQueryChannelInfo(collectionID UniqueID, kv *etcdkv.EtcdKV) error {
// key := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID)
// return kv.Remove(key)
//}
......@@ -304,23 +304,26 @@ func TestReloadMetaFromKV(t *testing.T) {
collectionInfo := &querypb.CollectionInfo{
CollectionID: defaultCollectionID,
}
collectionBlobs := proto.MarshalTextString(collectionInfo)
collectionBlobs, err := proto.Marshal(collectionInfo)
assert.Nil(t, err)
collectionKey := fmt.Sprintf("%s/%d", collectionMetaPrefix, defaultCollectionID)
kvs[collectionKey] = collectionBlobs
kvs[collectionKey] = string(collectionBlobs)
segmentInfo := &querypb.SegmentInfo{
SegmentID: defaultSegmentID,
}
segmentBlobs := proto.MarshalTextString(segmentInfo)
segmentBlobs, err := proto.Marshal(segmentInfo)
assert.Nil(t, err)
segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, defaultSegmentID)
kvs[segmentKey] = segmentBlobs
kvs[segmentKey] = string(segmentBlobs)
queryChannelInfo := &querypb.QueryChannelInfo{
CollectionID: defaultCollectionID,
}
queryChannelBlobs := proto.MarshalTextString(queryChannelInfo)
queryChannelBlobs, err := proto.Marshal(queryChannelInfo)
assert.Nil(t, err)
queryChannelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, defaultCollectionID)
kvs[queryChannelKey] = queryChannelBlobs
kvs[queryChannelKey] = string(queryChannelBlobs)
err = kv.MultiSave(kvs)
assert.Nil(t, err)
......
......@@ -302,7 +302,7 @@ func (qc *QueryCoord) watchMetaLoop() {
log.Error("watch MetaReplica loop error when get segmentID", zap.Any("error", err.Error()))
}
segmentInfo := &querypb.SegmentInfo{}
err = proto.UnmarshalText(string(event.Kv.Value), segmentInfo)
err = proto.Unmarshal(event.Kv.Value, segmentInfo)
if err != nil {
log.Error("watch MetaReplica loop error when unmarshal", zap.Any("error", err.Error()))
}
......
......@@ -146,9 +146,10 @@ func TestWatchNodeLoop(t *testing.T) {
collectionInfo := &querypb.CollectionInfo{
CollectionID: defaultCollectionID,
}
collectionBlobs := proto.MarshalTextString(collectionInfo)
collectionBlobs, err := proto.Marshal(collectionInfo)
assert.Nil(t, err)
nodeKey := fmt.Sprintf("%s/%d", queryNodeMetaPrefix, 100)
kvs[nodeKey] = collectionBlobs
kvs[nodeKey] = string(collectionBlobs)
err = kv.MultiSave(kvs)
assert.Nil(t, err)
......
......@@ -578,10 +578,14 @@ func (qn *queryNode) releaseSegments(ctx context.Context, in *querypb.ReleaseSeg
//****************************************************//
func saveNodeCollectionInfo(collectionID UniqueID, info *querypb.CollectionInfo, nodeID int64, kv *etcdkv.EtcdKV) error {
infoBytes := proto.MarshalTextString(info)
infoBytes, err := proto.Marshal(info)
if err != nil {
log.Error("QueryNode::saveNodeCollectionInfo ", zap.Error(err))
return err
}
key := fmt.Sprintf("%s/%d/%d", queryNodeMetaPrefix, nodeID, collectionID)
return kv.Save(key, infoBytes)
return kv.Save(key, string(infoBytes))
}
func removeNodeCollectionInfo(collectionID UniqueID, nodeID int64, kv *etcdkv.EtcdKV) error {
......
......@@ -103,7 +103,7 @@ func (h *historical) watchGlobalSegmentMeta() {
zap.Any("segmentID", segmentID),
)
segmentInfo := &querypb.SegmentInfo{}
err = proto.UnmarshalText(string(event.Kv.Value), segmentInfo)
err = proto.Unmarshal(event.Kv.Value, segmentInfo)
if err != nil {
log.Warn("watchGlobalSegmentMeta failed", zap.Any("error", err.Error()))
continue
......
......@@ -68,10 +68,11 @@ func TestHistorical_GlobalSealedSegments(t *testing.T) {
// watch test
go n.historical.watchGlobalSegmentMeta()
time.Sleep(100 * time.Millisecond) // for etcd latency
segmentInfoStr := proto.MarshalTextString(segmentInfo)
segmentInfoBytes, err := proto.Marshal(segmentInfo)
assert.Nil(t, err)
assert.NotNil(t, n.etcdKV)
segmentKey := segmentMetaPrefix + "/" + strconv.FormatInt(segmentID, 10)
err := n.etcdKV.Save(segmentKey, segmentInfoStr)
err = n.etcdKV.Save(segmentKey, string(segmentInfoBytes))
assert.NoError(t, err)
time.Sleep(200 * time.Millisecond) // for etcd latency
......
......@@ -123,7 +123,7 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer
return err
}
segmentInfo := &querypb.SegmentInfo{}
err = proto.UnmarshalText(value, segmentInfo)
err = proto.Unmarshal([]byte(value), segmentInfo)
if err != nil {
deleteSegment(segment)
log.Warn("error when unmarshal segment info from etcd", zap.Any("error", err.Error()))
......@@ -132,7 +132,14 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer
}
segmentInfo.SegmentState = querypb.SegmentState_sealed
newKey := fmt.Sprintf("%s/%d", queryNodeSegmentMetaPrefix, segmentID)
err = loader.etcdKV.Save(newKey, proto.MarshalTextString(segmentInfo))
newValue, err := proto.Marshal(segmentInfo)
if err != nil {
deleteSegment(segment)
log.Warn("error when marshal segment info", zap.Error(err))
segmentGC()
return err
}
err = loader.etcdKV.Save(newKey, string(newValue))
if err != nil {
deleteSegment(segment)
log.Warn("error when update segment info to etcd", zap.Any("error", err.Error()))
......
......@@ -102,8 +102,9 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
key := fmt.Sprintf("%s/%d", queryCoordSegmentMetaPrefix, defaultSegmentID)
segmentInfo := &querypb.SegmentInfo{}
value := proto.MarshalTextString(segmentInfo)
err = kv.Save(key, value)
value, err := proto.Marshal(segmentInfo)
assert.Nil(t, err)
err = kv.Save(key, string(value))
assert.NoError(t, err)
err = loader.loadSegment(req, true)
......@@ -139,8 +140,9 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
key := fmt.Sprintf("%s/%d", queryCoordSegmentMetaPrefix, defaultSegmentID)
segmentInfo := &querypb.SegmentInfo{}
value := proto.MarshalTextString(segmentInfo)
err = kv.Save(key, value)
value, err := proto.Marshal(segmentInfo)
assert.Nil(t, err)
err = kv.Save(key, string(value))
assert.NoError(t, err)
err = loader.loadSegment(req, true)
......
......@@ -249,12 +249,16 @@ func (mt *MetaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error {
defer mt.tenantLock.Unlock()
k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID)
v, _ := proto.Marshal(te)
v, err := proto.Marshal(te)
if err != nil {
log.Error("AddTenant Marshal fail", zap.Error(err))
return err
}
err := mt.client.Save(k, string(v), ts)
err = mt.client.Save(k, string(v), ts)
if err != nil {
log.Error("SnapShotKV Save fail", zap.Error(err))
panic("SnapShotKV Save fail")
log.Error("AddTenant Save fail", zap.Error(err))
return err
}
mt.tenantID2Meta[te.ID] = *te
return nil
......@@ -266,9 +270,13 @@ func (mt *MetaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error {
defer mt.proxyLock.Unlock()
k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID)
v, _ := proto.Marshal(po)
v, err := proto.Marshal(po)
if err != nil {
log.Error("AddProxy Marshal fail", zap.Error(err))
return err
}
err := mt.client.Save(k, string(v), ts)
err = mt.client.Save(k, string(v), ts)
if err != nil {
log.Error("SnapShotKV Save fail", zap.Error(err))
panic("SnapShotKV Save fail")
......@@ -302,7 +310,12 @@ func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam
for _, i := range idx {
k := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, coll.ID, i.IndexID)
v, _ := proto.Marshal(i)
v, err := proto.Marshal(i)
if err != nil {
log.Error("MetaTable AddCollection Marshal fail", zap.String("key", k),
zap.String("IndexName", i.IndexName), zap.Error(err))
return fmt.Errorf("MetaTable AddCollection Marshal fail key:%s, err:%w", k, err)
}
meta[k] = string(v)
}
......@@ -316,7 +329,12 @@ func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam
mt.collID2Meta[coll.ID] = *coll
mt.collName2ID[coll.Schema.Name] = coll.ID
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID)
v1, _ := proto.Marshal(coll)
v1, err := proto.Marshal(coll)
if err != nil {
log.Error("MetaTable AddCollection saveColl Marshal fail",
zap.String("key", k1), zap.Error(err))
return "", "", fmt.Errorf("MetaTable AddCollection saveColl Marshal fail key:%s, err:%w", k1, err)
}
meta[k1] = string(v1)
return k1, string(v1), nil
}
......@@ -588,7 +606,12 @@ func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string
mt.collID2Meta[collID] = coll
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
v1, _ := proto.Marshal(&coll)
v1, err := proto.Marshal(&coll)
if err != nil {
log.Error("MetaTable AddPartition saveColl Marshal fail",
zap.String("key", k1), zap.Error(err))
return "", "", fmt.Errorf("MetaTable AddPartition saveColl Marshal fail, k1:%s, err:%w", k1, err)
}
meta[k1] = string(v1)
return k1, string(v1), nil
......@@ -730,9 +753,14 @@ func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
delete(mt.partID2SegID, partID)
k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10))
v, _ := proto.Marshal(&collMeta)
v, err := proto.Marshal(&collMeta)
if err != nil {
log.Error("MetaTable DeletePartition Marshal collectionMeta fail",
zap.String("key", k), zap.Error(err))
return 0, fmt.Errorf("MetaTable DeletePartition Marshal collectionMeta fail key:%s, err:%w", k, err)
}
meta := map[string]string{k: string(v)}
delMetaKeys := []string{}
var delMetaKeys []string
for _, idxInfo := range collMeta.FieldIndexes {
k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partID)
delMetaKeys = append(delMetaKeys, k)
......@@ -741,7 +769,7 @@ func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
// save ddOpStr into etcd
addition := mt.getAdditionKV(ddOpStr, meta)
err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, ts, addition)
err = mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, ts, addition)
if err != nil {
log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
......@@ -793,9 +821,14 @@ func (mt *MetaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Times
mt.partID2SegID[segIdxInfo.PartitionID][segIdxInfo.SegmentID] = true
k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, segIdxInfo.CollectionID, segIdxInfo.IndexID, segIdxInfo.PartitionID, segIdxInfo.SegmentID)
v, _ := proto.Marshal(segIdxInfo)
v, err := proto.Marshal(segIdxInfo)
if err != nil {
log.Error("MetaTable AddIndex Marshal segIdxInfo fail",
zap.String("key", k), zap.Error(err))
return fmt.Errorf("MetaTable AddIndex Marshal segIdxInfo fail key:%s, err:%w", k, err)
}
err := mt.client.Save(k, string(v), ts)
err = mt.client.Save(k, string(v), ts)
if err != nil {
log.Error("SnapShotKV Save fail", zap.Error(err))
panic("SnapShotKV Save fail")
......@@ -852,7 +885,12 @@ func (mt *MetaTable) DropIndex(collName, fieldName, indexName string, ts typeuti
collMeta.FieldIndexes = fieldIdxInfo
mt.collID2Meta[collID] = collMeta
k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10))
v, _ := proto.Marshal(&collMeta)
v, err := proto.Marshal(&collMeta)
if err != nil {
log.Error("MetaTable DropIndex Marshal collMeta fail",
zap.String("key", k), zap.Error(err))
return 0, false, fmt.Errorf("MetaTable DropIndex Marshal collMeta fail key:%s, err:%w", k, err)
}
saveMeta := map[string]string{k: string(v)}
delete(mt.indexID2Meta, dropIdxID)
......@@ -1039,11 +1077,21 @@ func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, id
collMeta.FieldIndexes = append(collMeta.FieldIndexes, idx)
mt.collID2Meta[collMeta.ID] = collMeta
k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(collMeta.ID, 10))
v1, _ := proto.Marshal(&collMeta)
v1, err := proto.Marshal(&collMeta)
if err != nil {
log.Error("MetaTable GetNotIndexedSegments Marshal collMeta fail",
zap.String("key", k1), zap.Error(err))
return nil, schemapb.FieldSchema{}, fmt.Errorf("MetaTable GetNotIndexedSegments Marshal collMeta fail key:%s, err:%w", k1, err)
}
mt.indexID2Meta[idx.IndexID] = *idxInfo
k2 := path.Join(IndexMetaPrefix, strconv.FormatInt(idx.IndexID, 10))
v2, _ := proto.Marshal(idxInfo)
v2, err := proto.Marshal(idxInfo)
if err != nil {
log.Error("MetaTable GetNotIndexedSegments Marshal idxInfo fail",
zap.String("key", k2), zap.Error(err))
return nil, schemapb.FieldSchema{}, fmt.Errorf("MetaTable GetNotIndexedSegments Marshal idxInfo fail key:%s, err:%w", k2, err)
}
meta := map[string]string{k1: string(v1), k2: string(v2)}
if dupIdx != 0 {
......@@ -1051,7 +1099,12 @@ func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, id
dupInfo.IndexName = dupInfo.IndexName + "_bak"
mt.indexID2Meta[dupIdx] = dupInfo
k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
v, _ := proto.Marshal(&dupInfo)
v, err := proto.Marshal(&dupInfo)
if err != nil {
log.Error("MetaTable GetNotIndexedSegments Marshal dupInfo fail",
zap.String("key", k), zap.Error(err))
return nil, schemapb.FieldSchema{}, fmt.Errorf("MetaTable GetNotIndexedSegments Marshal dupInfo fail key:%s, err:%w", k, err)
}
meta[k] = string(v)
}
err = mt.client.MultiSave(meta, ts)
......@@ -1065,14 +1118,24 @@ func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, id
existInfo.IndexName = idxInfo.IndexName
mt.indexID2Meta[existInfo.IndexID] = existInfo
k := path.Join(IndexMetaPrefix, strconv.FormatInt(existInfo.IndexID, 10))
v, _ := proto.Marshal(&existInfo)
v, err := proto.Marshal(&existInfo)
if err != nil {
log.Error("MetaTable GetNotIndexedSegments Marshal existInfo fail",
zap.String("key", k), zap.Error(err))
return nil, schemapb.FieldSchema{}, fmt.Errorf("MetaTable GetNotIndexedSegments Marshal existInfo fail key:%s, err:%w", k, err)
}
meta := map[string]string{k: string(v)}
if dupIdx != 0 {
dupInfo := mt.indexID2Meta[dupIdx]
dupInfo.IndexName = dupInfo.IndexName + "_bak"
mt.indexID2Meta[dupIdx] = dupInfo
k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
v, _ := proto.Marshal(&dupInfo)
v, err := proto.Marshal(&dupInfo)
if err != nil {
log.Error("MetaTable GetNotIndexedSegments Marshal dupInfo fail",
zap.String("key", k), zap.Error(err))
return nil, schemapb.FieldSchema{}, fmt.Errorf("MetaTable GetNotIndexedSegments Marshal dupInfo fail key:%s, err:%w", k, err)
}
meta[k] = string(v)
}
......@@ -1184,7 +1247,12 @@ func (mt *MetaTable) AddAlias(collectionAlias string, collectionName string,
addition := mt.getAdditionKV(ddOpStr, meta)
saveAlias := func(ts typeutil.Timestamp) (string, string, error) {
k1 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias)
v1, _ := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
v1, err := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
if err != nil {
log.Error("MetaTable AddAlias saveAlias Marshal CollectionInfo fail",
zap.String("key", k1), zap.Error(err))
return "", "", fmt.Errorf("MetaTable AddAlias saveAlias Marshal CollectionInfo fail key:%s, err:%w", k1, err)
}
meta[k1] = string(v1)
return k1, string(v1), nil
}
......@@ -1236,7 +1304,12 @@ func (mt *MetaTable) AlterAlias(collectionAlias string, collectionName string, t
addition := mt.getAdditionKV(ddOpStr, meta)
alterAlias := func(ts typeutil.Timestamp) (string, string, error) {
k1 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias)
v1, _ := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
v1, err := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
if err != nil {
log.Error("MetaTable AlterAlias alterAlias Marshal CollectionInfo fail",
zap.String("key", k1), zap.Error(err))
return "", "", fmt.Errorf("MetaTable AlterAlias alterAlias Marshal CollectionInfo fail key:%s, err:%w", k1, err)
}
meta[k1] = string(v1)
return k1, string(v1), nil
}
......
......@@ -73,7 +73,8 @@ func Test_MockKV(t *testing.T) {
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
value, _ := proto.Marshal(&pb.TenantMeta{})
value, err := proto.Marshal(&pb.TenantMeta{})
assert.Nil(t, err)
prefix[TenantMetaPrefix] = []string{string(value)}
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
......@@ -83,7 +84,8 @@ func Test_MockKV(t *testing.T) {
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
value, _ = proto.Marshal(&pb.ProxyMeta{})
value, err = proto.Marshal(&pb.ProxyMeta{})
assert.Nil(t, err)
prefix[ProxyMetaPrefix] = []string{string(value)}
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
......@@ -93,7 +95,8 @@ func Test_MockKV(t *testing.T) {
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
value, _ = proto.Marshal(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{}})
value, err = proto.Marshal(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{}})
assert.Nil(t, err)
prefix[CollectionMetaPrefix] = []string{string(value)}
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
......@@ -103,7 +106,8 @@ func Test_MockKV(t *testing.T) {
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
value, _ = proto.Marshal(&pb.SegmentIndexInfo{})
value, err = proto.Marshal(&pb.SegmentIndexInfo{})
assert.Nil(t, err)
prefix[SegmentIndexMetaPrefix] = []string{string(value)}
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
......@@ -118,7 +122,8 @@ func Test_MockKV(t *testing.T) {
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
value, _ = proto.Marshal(&pb.IndexInfo{})
value, err = proto.Marshal(&pb.IndexInfo{})
assert.Nil(t, err)
prefix[IndexMetaPrefix] = []string{string(value)}
m1, err := NewMetaTable(k1)
assert.NotNil(t, err)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册