From e771bda92fa0099a2edfc4686066629afd52ec29 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Sat, 14 Aug 2021 11:18:10 +0800 Subject: [PATCH] optimize retrieve output vector code structure (#7102) Signed-off-by: yudong.cai --- .../core/src/segcore/SegmentInterface.cpp | 1 + internal/proxy/task.go | 9 + internal/querynode/historical.go | 84 ++++++-- internal/querynode/query_collection.go | 179 ++---------------- internal/querynode/segment.go | 82 +++++++- internal/querynode/streaming.go | 43 +++++ internal/storage/vector_chunk_manager.go | 8 +- internal/storage/vector_chunk_manager_test.go | 14 +- 8 files changed, 228 insertions(+), 192 deletions(-) diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp index e282b02ae..cc014f2a6 100644 --- a/internal/core/src/segcore/SegmentInterface.cpp +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -143,6 +143,7 @@ CreateDataArrayFrom(const void* data_raw, int64_t count, const FieldMeta& field_ auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); + data_array->set_type(milvus::proto::schema::DataType(field_meta.get_data_type())); if (!datatype_is_vector(data_type)) { auto scalar_array = CreateScalarArrayFrom(data_raw, count, data_type); diff --git a/internal/proxy/task.go b/internal/proxy/task.go index bc3194796..bad2c8c19 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -1262,6 +1262,15 @@ func (dct *DropCollectionTask) PostExecute(ctx context.Context) error { return nil } +// Support wildcard in output fields: +// "*" - all scalar fields +// "%" - all vector fields +// For example, A and B are scalar fields, C and D are vector fields, duplicated fields will automatically be removed. +// output_fields=["*"] ==> [A,B] +// output_fields=["%"] ==> [C,D] +// output_fields=["*","%"] ==> [A,B,C,D] +// output_fields=["*",A] ==> [A,B] +// output_fields=["*",C] ==> [A,B,C] func translateOutputFields(outputFields []string, schema *schemapb.CollectionSchema, addPrimary bool) ([]string, error) { var primaryFieldName string scalarFieldNameMap := make(map[string]bool) diff --git a/internal/querynode/historical.go b/internal/querynode/historical.go index 23ea0bf2c..e6fdb5479 100644 --- a/internal/querynode/historical.go +++ b/internal/querynode/historical.go @@ -27,6 +27,8 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/proto/segcorepb" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" ) @@ -177,25 +179,71 @@ func (h *historical) removeGlobalSegmentIDsByPartitionIds(partitionIDs []UniqueI } } -func (h *historical) search(searchReqs []*searchRequest, - collID UniqueID, - partIDs []UniqueID, - plan *SearchPlan, - searchTs Timestamp) ([]*SearchResult, []*Segment, error) { +func (h *historical) retrieve(collID UniqueID, partIDs []UniqueID, vcm *storage.VectorChunkManager, + plan *RetrievePlan) ([]*segcorepb.RetrieveResults, []UniqueID, error) { + + retrieveResults := make([]*segcorepb.RetrieveResults, 0) + retrieveSegmentIDs := make([]UniqueID, 0) + + // get historical partition ids + var retrievePartIDs []UniqueID + if len(partIDs) == 0 { + hisPartIDs, err := h.replica.getPartitionIDs(collID) + if err != nil { + return retrieveResults, retrieveSegmentIDs, err + } + retrievePartIDs = hisPartIDs + } else { + for _, id := range partIDs { + _, err := h.replica.getPartitionByID(id) + if err == nil { + retrievePartIDs = append(retrievePartIDs, id) + } + } + } + + col, err := h.replica.getCollectionByID(collID) + if err != nil { + return nil, nil, err + } + + for _, partID := range retrievePartIDs { + segIDs, err := h.replica.getSegmentIDs(partID) + if err != nil { + return retrieveResults, retrieveSegmentIDs, err + } + for _, segID := range segIDs { + seg, err := h.replica.getSegmentByID(segID) + if err != nil { + return retrieveResults, retrieveSegmentIDs, err + } + result, err := seg.getEntityByIds(plan) + if err != nil { + return retrieveResults, retrieveSegmentIDs, err + } + + if err = seg.fillVectorFieldsData(collID, col.schema, vcm, result); err != nil { + return retrieveResults, retrieveSegmentIDs, err + } + retrieveResults = append(retrieveResults, result) + retrieveSegmentIDs = append(retrieveSegmentIDs, segID) + } + } + return retrieveResults, retrieveSegmentIDs, nil +} + +func (h *historical) search(searchReqs []*searchRequest, collID UniqueID, partIDs []UniqueID, plan *SearchPlan, + searchTs Timestamp) ([]*SearchResult, []UniqueID, error) { searchResults := make([]*SearchResult, 0) - segmentResults := make([]*Segment, 0) + searchSegmentIDs := make([]UniqueID, 0) // get historical partition ids var searchPartIDs []UniqueID if len(partIDs) == 0 { hisPartIDs, err := h.replica.getPartitionIDs(collID) - if len(hisPartIDs) == 0 { - // no partitions in collection, do empty search - return nil, nil, nil - } if err != nil { - return searchResults, segmentResults, err + return searchResults, searchSegmentIDs, err } log.Debug("no partition specified, search all partitions", zap.Any("collectionID", collID), @@ -223,9 +271,7 @@ func (h *historical) search(searchReqs []*searchRequest, // all partitions have been released if len(searchPartIDs) == 0 && col.getLoadType() == loadTypePartition { return nil, nil, errors.New("partitions have been released , collectionID = " + - fmt.Sprintln(collID) + - "target partitionIDs = " + - fmt.Sprintln(partIDs)) + fmt.Sprintln(collID) + "target partitionIDs = " + fmt.Sprintln(partIDs)) } if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection { @@ -244,24 +290,24 @@ func (h *historical) search(searchReqs []*searchRequest, for _, partID := range searchPartIDs { segIDs, err := h.replica.getSegmentIDs(partID) if err != nil { - return searchResults, segmentResults, err + return searchResults, searchSegmentIDs, err } for _, segID := range segIDs { seg, err := h.replica.getSegmentByID(segID) if err != nil { - return searchResults, segmentResults, err + return searchResults, searchSegmentIDs, err } if !seg.getOnService() { continue } searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs}) if err != nil { - return searchResults, segmentResults, err + return searchResults, searchSegmentIDs, err } searchResults = append(searchResults, searchResult) - segmentResults = append(segmentResults, seg) + searchSegmentIDs = append(searchSegmentIDs, seg.segmentID) } } - return searchResults, segmentResults, nil + return searchResults, searchSegmentIDs, nil } diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index 027652375..ad170421c 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -12,7 +12,6 @@ package querynode import ( - "bytes" "context" "encoding/binary" "fmt" @@ -28,7 +27,6 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/schemapb" @@ -843,18 +841,14 @@ func (q *queryCollection) search(msg queryMsg) error { } searchResults := make([]*SearchResult, 0) - sealedSegmentSearched := make([]UniqueID, 0) // historical search - hisSearchResults, hisSegmentResults, err1 := q.historical.search(searchRequests, collectionID, searchMsg.PartitionIDs, plan, travelTimestamp) + hisSearchResults, sealedSegmentSearched, err1 := q.historical.search(searchRequests, collectionID, searchMsg.PartitionIDs, plan, travelTimestamp) if err1 != nil { log.Warn(err1.Error()) return err1 } searchResults = append(searchResults, hisSearchResults...) - for _, seg := range hisSegmentResults { - sealedSegmentSearched = append(sealedSegmentSearched, seg.segmentID) - } tr.Record("historical search done") // streaming search @@ -1045,98 +1039,6 @@ func (q *queryCollection) search(msg queryMsg) error { return nil } -func (q *queryCollection) fillVectorFieldsData(segment *Segment, result *segcorepb.RetrieveResults) error { - // If segment is growing, vector data is in memory - if segment.segmentType == segmentTypeGrowing { - log.Debug("Segment is growing, all vector data is in memory") - log.Debug("FillVectorFieldData", zap.Any("segmentType", segment.segmentType)) - return nil - } - collection, _ := q.streaming.replica.getCollectionByID(q.collectionID) - schema := &etcdpb.CollectionMeta{ - ID: q.collectionID, - Schema: collection.schema} - schemaHelper, err := typeutil.CreateSchemaHelper(collection.schema) - if err != nil { - return err - } - for _, resultFieldData := range result.FieldsData { - log.Debug("FillVectorFieldData for fieldID", zap.Any("fieldID", resultFieldData.FieldId)) - // If the vector field doesn't have index. Vector data is in memory for - // brute force search. No need to download data from remote. - _, ok := segment.indexInfos[resultFieldData.FieldId] - if !ok { - log.Debug("FillVectorFieldData fielD doesn't have index", - zap.Any("fielD", resultFieldData.FieldId)) - continue - } - - vecFieldInfo, err := segment.getVectorFieldInfo(resultFieldData.FieldId) - if err != nil { - continue - } - dim := resultFieldData.GetVectors().GetDim() - log.Debug("FillVectorFieldData", zap.Any("dim", dim)) - fieldSchema, err := schemaHelper.GetFieldFromID(resultFieldData.FieldId) - if err != nil { - return err - } - dataType := fieldSchema.DataType - log.Debug("FillVectorFieldData", zap.Any("datatype", dataType)) - - data := resultFieldData.GetVectors().GetData() - - for i, offset := range result.Offset { - var vecPath string - for index, idBinlogRowSize := range segment.idBinlogRowSizes { - if offset < idBinlogRowSize { - vecPath = vecFieldInfo.fieldBinlog.Binlogs[index] - break - } else { - offset -= idBinlogRowSize - } - } - log.Debug("FillVectorFieldData", zap.Any("path", vecPath)) - err := q.vcm.DownloadVectorFile(vecPath, schema) - if err != nil { - return err - } - - switch dataType { - case schemapb.DataType_BinaryVector: - rowBytes := dim / 8 - content := make([]byte, rowBytes) - _, err := q.vcm.ReadAt(vecPath, content, offset*rowBytes) - if err != nil { - return err - } - log.Debug("FillVectorFieldData", zap.Any("binaryVectorResult", content)) - - resultLen := dim / 8 - copy(data.(*schemapb.VectorField_BinaryVector).BinaryVector[i*int(resultLen):(i+1)*int(resultLen)], content) - case schemapb.DataType_FloatVector: - rowBytes := dim * 4 - content := make([]byte, rowBytes) - _, err := q.vcm.ReadAt(vecPath, content, offset*rowBytes) - if err != nil { - return err - } - floatResult := make([]float32, dim) - buf := bytes.NewReader(content) - err = binary.Read(buf, binary.LittleEndian, &floatResult) - if err != nil { - return err - } - log.Debug("FillVectorFieldData", zap.Any("floatVectorResult", floatResult)) - - resultLen := dim - copy(data.(*schemapb.VectorField_FloatVector).FloatVector.Data[i*int(resultLen):(i+1)*int(resultLen)], floatResult) - } - } - } - return nil -} - func (q *queryCollection) retrieve(msg queryMsg) error { // TODO(yukun) // step 1: get retrieve object and defer destruction @@ -1170,77 +1072,30 @@ func (q *queryCollection) retrieve(msg queryMsg) error { tr := timerecord.NewTimeRecorder(fmt.Sprintf("retrieve %d", retrieveMsg.CollectionID)) var globalSealedSegments []UniqueID - var partitionIDsInHistorical []UniqueID - var partitionIDsInStreaming []UniqueID - partitionIDsInQuery := retrieveMsg.PartitionIDs - if len(partitionIDsInQuery) == 0 { - globalSealedSegments = q.historical.getGlobalSegmentIDsByCollectionID(collectionID) - partitionIDsInHistoricalCol, err1 := q.historical.replica.getPartitionIDs(collectionID) - partitionIDsInStreamingCol, err2 := q.streaming.replica.getPartitionIDs(collectionID) - if err1 != nil && err2 != nil { - return err2 - } - partitionIDsInHistorical = partitionIDsInHistoricalCol - partitionIDsInStreaming = partitionIDsInStreamingCol + if len(retrieveMsg.PartitionIDs) > 0 { + globalSealedSegments = q.historical.getGlobalSegmentIDsByPartitionIds(retrieveMsg.PartitionIDs) } else { - globalSealedSegments = q.historical.getGlobalSegmentIDsByPartitionIds(partitionIDsInQuery) - for _, id := range partitionIDsInQuery { - _, err1 := q.historical.replica.getPartitionByID(id) - if err1 == nil { - partitionIDsInHistorical = append(partitionIDsInHistorical, id) - } - _, err2 := q.streaming.replica.getPartitionByID(id) - if err2 == nil { - partitionIDsInStreaming = append(partitionIDsInStreaming, id) - } - if err1 != nil && err2 != nil { - return err2 - } - } + globalSealedSegments = q.historical.getGlobalSegmentIDsByCollectionID(collectionID) } - sealedSegmentRetrieved := make([]UniqueID, 0) + var mergeList []*segcorepb.RetrieveResults - for _, partitionID := range partitionIDsInHistorical { - segmentIDs, err := q.historical.replica.getSegmentIDs(partitionID) - if err != nil { - return err - } - for _, segmentID := range segmentIDs { - segment, err := q.historical.replica.getSegmentByID(segmentID) - if err != nil { - return err - } - result, err := segment.getEntityByIds(plan) - if err != nil { - return err - } - if err = q.fillVectorFieldsData(segment, result); err != nil { - return err - } - mergeList = append(mergeList, result) - sealedSegmentRetrieved = append(sealedSegmentRetrieved, segmentID) - } + // historical retrieve + hisRetrieveResults, sealedSegmentRetrieved, err1 := q.historical.retrieve(collectionID, retrieveMsg.PartitionIDs, q.vcm, plan) + if err1 != nil { + log.Warn(err1.Error()) + return err1 } + mergeList = append(mergeList, hisRetrieveResults...) tr.Record("historical retrieve done") - for _, partitionID := range partitionIDsInStreaming { - segmentIDs, err := q.streaming.replica.getSegmentIDs(partitionID) - if err != nil { - return err - } - for _, segmentID := range segmentIDs { - segment, err := q.streaming.replica.getSegmentByID(segmentID) - if err != nil { - return err - } - result, err := segment.getEntityByIds(plan) - if err != nil { - return err - } - mergeList = append(mergeList, result) - } + // streaming retrieve + strRetrieveResults, _, err2 := q.streaming.retrieve(collectionID, retrieveMsg.PartitionIDs, plan) + if err2 != nil { + log.Warn(err2.Error()) + return err2 } + mergeList = append(mergeList, strRetrieveResults...) tr.Record("streaming retrieve done") result, err := mergeRetrieveResults(mergeList) diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 018dbe8c4..713f3610a 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -12,9 +12,7 @@ package querynode /* - #cgo CFLAGS: -I${SRCDIR}/../core/output/include - #cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib #include "segcore/collection_c.h" @@ -23,6 +21,8 @@ package querynode */ import "C" import ( + "bytes" + "encoding/binary" "errors" "fmt" "strconv" @@ -35,7 +35,9 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/segcorepb" + "github.com/milvus-io/milvus/internal/storage" ) type segmentType int32 @@ -316,6 +318,82 @@ func (s *Segment) getEntityByIds(plan *RetrievePlan) (*segcorepb.RetrieveResults return result, nil } +func (s *Segment) fillVectorFieldsData(collectionID UniqueID, schema *schemapb.CollectionSchema, + vcm *storage.VectorChunkManager, result *segcorepb.RetrieveResults) error { + + for _, fieldData := range result.FieldsData { + log.Debug("FillVectorFieldData for fieldID", zap.Any("fieldID", fieldData.FieldId)) + // If the vector field doesn't have index. Vector data is in memory for + // brute force search. No need to download data from remote. + _, ok := s.indexInfos[fieldData.FieldId] + if !ok { + log.Debug("FillVectorFieldData field doesn't have index", + zap.Any("field", fieldData.FieldId)) + continue + } + + vecFieldInfo, err := s.getVectorFieldInfo(fieldData.FieldId) + if err != nil { + continue + } + log.Debug("FillVectorFieldData", zap.Any("fieldId", fieldData.FieldId)) + + dim := fieldData.GetVectors().GetDim() + log.Debug("FillVectorFieldData", zap.Int64("dim", dim), zap.Any("datatype", fieldData.Type)) + + for i, offset := range result.Offset { + var vecPath string + for index, idBinlogRowSize := range s.idBinlogRowSizes { + if offset < idBinlogRowSize { + vecPath = vecFieldInfo.fieldBinlog.Binlogs[index] + break + } else { + offset -= idBinlogRowSize + } + } + log.Debug("FillVectorFieldData", zap.Any("path", vecPath)) + err := vcm.DownloadVectorFile(vecPath, collectionID, schema) + if err != nil { + return err + } + + switch fieldData.Type { + case schemapb.DataType_BinaryVector: + rowBytes := dim / 8 + x := fieldData.GetVectors().GetData().(*schemapb.VectorField_BinaryVector) + content := make([]byte, rowBytes) + _, err := vcm.ReadAt(vecPath, content, offset*rowBytes) + if err != nil { + return err + } + log.Debug("FillVectorFieldData", zap.Any("binaryVectorResult", content)) + + resultLen := dim / 8 + copy(x.BinaryVector[i*int(resultLen):(i+1)*int(resultLen)], content) + case schemapb.DataType_FloatVector: + x := fieldData.GetVectors().GetData().(*schemapb.VectorField_FloatVector) + rowBytes := dim * 4 + content := make([]byte, rowBytes) + _, err := vcm.ReadAt(vecPath, content, offset*rowBytes) + if err != nil { + return err + } + floatResult := make([]float32, dim) + buf := bytes.NewReader(content) + err = binary.Read(buf, binary.LittleEndian, &floatResult) + if err != nil { + return err + } + log.Debug("FillVectorFieldData", zap.Any("floatVectorResult", floatResult)) + + resultLen := dim + copy(x.FloatVector.Data[i*int(resultLen):(i+1)*int(resultLen)], floatResult) + } + } + } + return nil +} + //-------------------------------------------------------------------------------------- index info interface func (s *Segment) setIndexName(fieldID int64, name string) error { s.paramMutex.Lock() diff --git a/internal/querynode/streaming.go b/internal/querynode/streaming.go index e1c2dd0c9..a954ddb13 100644 --- a/internal/querynode/streaming.go +++ b/internal/querynode/streaming.go @@ -21,6 +21,7 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/proto/segcorepb" "github.com/milvus-io/milvus/internal/util/tsoutil" ) @@ -61,6 +62,48 @@ func (s *streaming) close() { s.replica.freeAll() } +func (s *streaming) retrieve(collID UniqueID, partIDs []UniqueID, plan *RetrievePlan) ([]*segcorepb.RetrieveResults, []UniqueID, error) { + retrieveResults := make([]*segcorepb.RetrieveResults, 0) + retrieveSegmentIDs := make([]UniqueID, 0) + + var retrievePartIDs []UniqueID + if len(partIDs) == 0 { + strPartIDs, err := s.replica.getPartitionIDs(collID) + if err != nil { + return retrieveResults, retrieveSegmentIDs, err + } + retrievePartIDs = strPartIDs + } else { + for _, id := range partIDs { + _, err := s.replica.getPartitionByID(id) + if err == nil { + retrievePartIDs = append(retrievePartIDs, id) + } + } + } + + for _, partID := range retrievePartIDs { + segIDs, err := s.replica.getSegmentIDs(partID) + if err != nil { + return retrieveResults, retrieveSegmentIDs, err + } + for _, segID := range segIDs { + seg, err := s.replica.getSegmentByID(segID) + if err != nil { + return retrieveResults, retrieveSegmentIDs, err + } + result, err := seg.getEntityByIds(plan) + if err != nil { + return retrieveResults, retrieveSegmentIDs, err + } + + retrieveResults = append(retrieveResults, result) + retrieveSegmentIDs = append(retrieveSegmentIDs, segID) + } + } + return retrieveResults, retrieveSegmentIDs, nil +} + func (s *streaming) search(searchReqs []*searchRequest, collID UniqueID, partIDs []UniqueID, vChannel Channel, plan *SearchPlan, searchTs Timestamp) ([]*SearchResult, error) { diff --git a/internal/storage/vector_chunk_manager.go b/internal/storage/vector_chunk_manager.go index 286923419..40ea16cdd 100644 --- a/internal/storage/vector_chunk_manager.go +++ b/internal/storage/vector_chunk_manager.go @@ -17,6 +17,7 @@ import ( "errors" "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/proto/schemapb" ) type VectorChunkManager struct { @@ -31,11 +32,14 @@ func NewVectorChunkManager(localChunkManager ChunkManager, remoteChunkManager Ch } } -func (vcm *VectorChunkManager) DownloadVectorFile(key string, schema *etcdpb.CollectionMeta) error { +func (vcm *VectorChunkManager) DownloadVectorFile(key string, collectionID UniqueID, schema *schemapb.CollectionSchema) error { if vcm.localChunkManager.Exist(key) { return nil } - insertCodec := NewInsertCodec(schema) + insertCodec := NewInsertCodec(&etcdpb.CollectionMeta{ + ID: collectionID, + Schema: schema, + }) content, err := vcm.remoteChunkManager.Read(key) if err != nil { return err diff --git a/internal/storage/vector_chunk_manager_test.go b/internal/storage/vector_chunk_manager_test.go index 168b4bc3f..50dd47ae9 100644 --- a/internal/storage/vector_chunk_manager_test.go +++ b/internal/storage/vector_chunk_manager_test.go @@ -44,19 +44,19 @@ func TestVectorChunkManager(t *testing.T) { lcm := NewLocalChunkManager(localPath) - schema := initSchema() + meta := initMeta() vcm := NewVectorChunkManager(lcm, rcm) assert.NotNil(t, vcm) - binlogs := initBinlogFile(schema) + binlogs := initBinlogFile(meta) assert.NotNil(t, binlogs) for _, binlog := range binlogs { rcm.Write(binlog.Key, binlog.Value) } - err = vcm.DownloadVectorFile("108", schema) + err = vcm.DownloadVectorFile("108", meta.ID, meta.Schema) assert.Nil(t, err) - err = vcm.DownloadVectorFile("109", schema) + err = vcm.DownloadVectorFile("109", meta.ID, meta.Schema) assert.Nil(t, err) content, err := vcm.Read("108") @@ -107,8 +107,8 @@ func newMinIOKVClient(ctx context.Context, bucketName string) (*miniokv.MinIOKV, return client, err } -func initSchema() *etcdpb.CollectionMeta { - schema := &etcdpb.CollectionMeta{ +func initMeta() *etcdpb.CollectionMeta { + meta := &etcdpb.CollectionMeta{ ID: 1, CreateTime: 1, SegmentIDs: []int64{0, 1}, @@ -156,7 +156,7 @@ func initSchema() *etcdpb.CollectionMeta { }, }, } - return schema + return meta } func initBinlogFile(schema *etcdpb.CollectionMeta) []*Blob { -- GitLab