diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 1cd007902016ca804e4ae7738df6602ace89a012..bf9c3e02a98fa0988ae60ac6741e73e66be2fbdc 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -30,6 +30,7 @@ import ( */ type collectionReplica interface { // collection + getCollectionIDs() []UniqueID addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error removeCollection(collectionID UniqueID) error getCollectionByID(collectionID UniqueID) (*Collection, error) @@ -37,7 +38,8 @@ type collectionReplica interface { getCollectionNum() int getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) - getVecFieldsByCollectionID(collectionID UniqueID) ([]int64, error) + getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) + getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) // partition addPartition(collectionID UniqueID, partitionID UniqueID) error @@ -47,9 +49,8 @@ type collectionReplica interface { getPartitionNum() int getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) - enablePartitionDM(partitionID UniqueID) error - disablePartitionDM(partitionID UniqueID) error - getEnablePartitionDM(partitionID UniqueID) (bool, error) + enablePartition(partitionID UniqueID) error + disablePartition(partitionID UniqueID) error // segment addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error @@ -59,7 +60,7 @@ type collectionReplica interface { getSegmentNum() int getSegmentStatistics() []*internalpb2.SegmentStats - getSealedSegments() ([]UniqueID, []UniqueID) + getEnabledSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) replaceGrowingSegmentBySealedSegment(segment *Segment) error getTSafe() tSafe @@ -76,6 +77,16 @@ type collectionReplicaImpl struct { } //----------------------------------------------------------------------------------------------------- collection +func (colReplica *collectionReplicaImpl) getCollectionIDs() []UniqueID { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + collectionIDs := make([]UniqueID, 0) + for id := range colReplica.collections { + collectionIDs = append(collectionIDs, id) + } + return collectionIDs +} + func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() @@ -158,29 +169,59 @@ func (colReplica *collectionReplicaImpl) getPartitionIDs(collectionID UniqueID) return collection.partitionIDs, nil } -func (colReplica *collectionReplicaImpl) getVecFieldsByCollectionID(collectionID UniqueID) ([]int64, error) { +func (colReplica *collectionReplicaImpl) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() - collection, err := colReplica.getCollectionByIDPrivate(collectionID) + fields, err := colReplica.getFieldsByCollectionIDPrivate(collectionID) if err != nil { return nil, err } vecFields := make([]int64, 0) - for _, field := range collection.Schema().Fields { + for _, field := range fields { if field.DataType == schemapb.DataType_VECTOR_BINARY || field.DataType == schemapb.DataType_VECTOR_FLOAT { vecFields = append(vecFields, field.FieldID) } } if len(vecFields) <= 0 { - return nil, errors.New("no vector field in segment " + strconv.FormatInt(collectionID, 10)) + return nil, errors.New("no vector field in collection " + strconv.FormatInt(collectionID, 10)) } return vecFields, nil } +func (colReplica *collectionReplicaImpl) getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + + fields, err := colReplica.getFieldsByCollectionIDPrivate(collectionID) + if err != nil { + return nil, err + } + + targetFields := make([]int64, 0) + for _, field := range fields { + targetFields = append(targetFields, field.FieldID) + } + + return targetFields, nil +} + +func (colReplica *collectionReplicaImpl) getFieldsByCollectionIDPrivate(collectionID UniqueID) ([]*schemapb.FieldSchema, error) { + collection, err := colReplica.getCollectionByIDPrivate(collectionID) + if err != nil { + return nil, err + } + + if len(collection.Schema().Fields) <= 0 { + return nil, errors.New("no field in collection " + strconv.FormatInt(collectionID, 10)) + } + + return collection.Schema().Fields, nil +} + //----------------------------------------------------------------------------------------------------- partition func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionID UniqueID) error { colReplica.mu.Lock() @@ -263,7 +304,10 @@ func (colReplica *collectionReplicaImpl) getPartitionNum() int { func (colReplica *collectionReplicaImpl) getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() + return colReplica.getSegmentIDsPrivate(partitionID) +} +func (colReplica *collectionReplicaImpl) getSegmentIDsPrivate(partitionID UniqueID) ([]UniqueID, error) { partition, err2 := colReplica.getPartitionByIDPrivate(partitionID) if err2 != nil { return nil, err2 @@ -271,7 +315,7 @@ func (colReplica *collectionReplicaImpl) getSegmentIDs(partitionID UniqueID) ([] return partition.segmentIDs, nil } -func (colReplica *collectionReplicaImpl) enablePartitionDM(partitionID UniqueID) error { +func (colReplica *collectionReplicaImpl) enablePartition(partitionID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() @@ -280,11 +324,11 @@ func (colReplica *collectionReplicaImpl) enablePartitionDM(partitionID UniqueID) return err } - partition.enableDM = true + partition.enable = true return nil } -func (colReplica *collectionReplicaImpl) disablePartitionDM(partitionID UniqueID) error { +func (colReplica *collectionReplicaImpl) disablePartition(partitionID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() @@ -293,19 +337,18 @@ func (colReplica *collectionReplicaImpl) disablePartitionDM(partitionID UniqueID return err } - partition.enableDM = false + partition.enable = false return nil } -func (colReplica *collectionReplicaImpl) getEnablePartitionDM(partitionID UniqueID) (bool, error) { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() - - partition, err := colReplica.getPartitionByIDPrivate(partitionID) - if err != nil { - return false, err +func (colReplica *collectionReplicaImpl) getEnabledPartitionIDsPrivate() []UniqueID { + partitionIDs := make([]UniqueID, 0) + for _, partition := range colReplica.partitions { + if partition.enable { + partitionIDs = append(partitionIDs, partition.partitionID) + } } - return partition.enableDM, nil + return partitionIDs } //----------------------------------------------------------------------------------------------------- segment @@ -414,20 +457,33 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.S return statisticData } -func (colReplica *collectionReplicaImpl) getSealedSegments() ([]UniqueID, []UniqueID) { +func (colReplica *collectionReplicaImpl) getEnabledSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() - collectionIDs := make([]UniqueID, 0) - segmentIDs := make([]UniqueID, 0) - for k, v := range colReplica.segments { - if v.getType() == segTypeSealed { - collectionIDs = append(collectionIDs, v.collectionID) - segmentIDs = append(segmentIDs, k) + targetCollectionIDs := make([]UniqueID, 0) + targetPartitionIDs := make([]UniqueID, 0) + targetSegmentIDs := make([]UniqueID, 0) + + for _, partitionID := range colReplica.getEnabledPartitionIDsPrivate() { + segmentIDs, err := colReplica.getSegmentIDsPrivate(partitionID) + if err != nil { + continue + } + for _, segmentID := range segmentIDs { + segment, err := colReplica.getSegmentByIDPrivate(segmentID) + if err != nil { + continue + } + if segment.getType() == segType { + targetCollectionIDs = append(targetCollectionIDs, segment.collectionID) + targetPartitionIDs = append(targetPartitionIDs, segment.collectionID) + targetSegmentIDs = append(targetSegmentIDs, segment.segmentID) + } } } - return collectionIDs, segmentIDs + return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs } func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(segment *Segment) error { diff --git a/internal/querynode/index_loader.go b/internal/querynode/index_loader.go new file mode 100644 index 0000000000000000000000000000000000000000..207386ad5c561dd25d81cd02f0c0c976ed4c3b5a --- /dev/null +++ b/internal/querynode/index_loader.go @@ -0,0 +1,409 @@ +package querynode + +import ( + "context" + "errors" + "fmt" + "log" + "path" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/zilliztech/milvus-distributed/internal/kv" + minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio" + "github.com/zilliztech/milvus-distributed/internal/msgstream/util" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "github.com/zilliztech/milvus-distributed/internal/storage" +) + +type indexLoader struct { + replica collectionReplica + + fieldIndexes map[string][]*internalpb2.IndexStats + fieldStatsChan chan []*internalpb2.FieldStats + + masterClient MasterServiceInterface + indexClient IndexServiceInterface + + kv kv.Base // minio kv +} + +type loadIndex struct { + segmentID UniqueID + fieldID int64 + indexPaths []string +} + +func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) { + collectionIDs, _, segmentIDs := loader.replica.getEnabledSealedSegmentsBySegmentType(segTypeSealed) + if len(collectionIDs) <= 0 { + return + } + fmt.Println("do load index for sealed segments:", segmentIDs) + for i := range collectionIDs { + // we don't need index id yet + _, buildID, err := loader.getIndexInfo(collectionIDs[i], segmentIDs[i]) + if err != nil { + indexPaths, err := loader.getIndexPaths(buildID) + if err != nil { + log.Println(err) + continue + } + err = loader.loadIndexDelayed(collectionIDs[i], segmentIDs[i], indexPaths) + if err != nil { + log.Println(err) + continue + } + } + } + // sendQueryNodeStats + err := loader.sendQueryNodeStats() + if err != nil { + log.Println(err) + wg.Done() + return + } + + wg.Done() +} + +func (loader *indexLoader) execute(l *loadIndex) error { + // 1. use msg's index paths to get index bytes + var err error + var indexBuffer [][]byte + var indexParams indexParam + var indexName string + var indexID UniqueID + fn := func() error { + indexBuffer, indexParams, indexName, indexID, err = loader.loadIndex(l.indexPaths) + if err != nil { + return err + } + return nil + } + err = util.Retry(5, time.Millisecond*200, fn) + if err != nil { + return err + } + ok, err := loader.checkIndexReady(indexParams, l) + if err != nil { + return err + } + if ok { + // no error + return errors.New("") + } + // 2. use index bytes and index path to update segment + err = loader.updateSegmentIndex(indexParams, indexBuffer, l) + if err != nil { + return err + } + // 3. update segment index stats + err = loader.updateSegmentIndexStats(indexParams, indexName, indexID, l) + if err != nil { + return err + } + fmt.Println("load index done") + return nil +} + +func (loader *indexLoader) printIndexParams(index []*commonpb.KeyValuePair) { + fmt.Println("=================================================") + for i := 0; i < len(index); i++ { + fmt.Println(index[i]) + } +} + +func (loader *indexLoader) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool { + if len(index1) != len(index2) { + return false + } + + for i := 0; i < len(index1); i++ { + kv1 := *index1[i] + kv2 := *index2[i] + if kv1.Key != kv2.Key || kv1.Value != kv2.Value { + return false + } + } + + return true +} + +func (loader *indexLoader) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string { + return strconv.FormatInt(collectionID, 10) + "/" + strconv.FormatInt(fieldID, 10) +} + +func (loader *indexLoader) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) { + ids := strings.Split(key, "/") + if len(ids) != 2 { + return 0, 0, errors.New("illegal fieldsStatsKey") + } + collectionID, err := strconv.ParseInt(ids[0], 10, 64) + if err != nil { + return 0, 0, err + } + fieldID, err := strconv.ParseInt(ids[1], 10, 64) + if err != nil { + return 0, 0, err + } + return collectionID, fieldID, nil +} + +func (loader *indexLoader) updateSegmentIndexStats(indexParams indexParam, indexName string, indexID UniqueID, l *loadIndex) error { + targetSegment, err := loader.replica.getSegmentByID(l.segmentID) + if err != nil { + return err + } + + fieldStatsKey := loader.fieldsStatsIDs2Key(targetSegment.collectionID, l.fieldID) + _, ok := loader.fieldIndexes[fieldStatsKey] + newIndexParams := make([]*commonpb.KeyValuePair, 0) + for k, v := range indexParams { + newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{ + Key: k, + Value: v, + }) + } + + // sort index params by key + sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key }) + if !ok { + loader.fieldIndexes[fieldStatsKey] = make([]*internalpb2.IndexStats, 0) + loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey], + &internalpb2.IndexStats{ + IndexParams: newIndexParams, + NumRelatedSegments: 1, + }) + } else { + isNewIndex := true + for _, index := range loader.fieldIndexes[fieldStatsKey] { + if loader.indexParamsEqual(newIndexParams, index.IndexParams) { + index.NumRelatedSegments++ + isNewIndex = false + } + } + if isNewIndex { + loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey], + &internalpb2.IndexStats{ + IndexParams: newIndexParams, + NumRelatedSegments: 1, + }) + } + } + err = targetSegment.setIndexParam(l.fieldID, newIndexParams) + if err != nil { + return err + } + targetSegment.setIndexName(indexName) + targetSegment.setIndexID(indexID) + + return nil +} + +func (loader *indexLoader) loadIndex(indexPath []string) ([][]byte, indexParam, string, UniqueID, error) { + index := make([][]byte, 0) + + var indexParams indexParam + var indexName string + var indexID UniqueID + for _, p := range indexPath { + fmt.Println("load path = ", indexPath) + indexPiece, err := loader.kv.Load(p) + if err != nil { + return nil, nil, "", -1, err + } + // get index params when detecting indexParamPrefix + if path.Base(p) == storage.IndexParamsFile { + indexCodec := storage.NewIndexCodec() + _, indexParams, indexName, indexID, err = indexCodec.Deserialize([]*storage.Blob{ + { + Key: storage.IndexParamsFile, + Value: []byte(indexPiece), + }, + }) + if err != nil { + return nil, nil, "", -1, err + } + } else { + index = append(index, []byte(indexPiece)) + } + } + + if len(indexParams) <= 0 { + return nil, nil, "", -1, errors.New("cannot find index param") + } + return index, indexParams, indexName, indexID, nil +} + +func (loader *indexLoader) updateSegmentIndex(indexParams indexParam, bytesIndex [][]byte, l *loadIndex) error { + segment, err := loader.replica.getSegmentByID(l.segmentID) + if err != nil { + return err + } + + loadIndexInfo, err := newLoadIndexInfo() + defer deleteLoadIndexInfo(loadIndexInfo) + if err != nil { + return err + } + err = loadIndexInfo.appendFieldInfo(l.fieldID) + if err != nil { + return err + } + for k, v := range indexParams { + err = loadIndexInfo.appendIndexParam(k, v) + if err != nil { + return err + } + } + err = loadIndexInfo.appendIndex(bytesIndex, l.indexPaths) + if err != nil { + return err + } + return segment.updateSegmentIndex(loadIndexInfo) +} + +func (loader *indexLoader) sendQueryNodeStats() error { + resultFieldsStats := make([]*internalpb2.FieldStats, 0) + for fieldStatsKey, indexStats := range loader.fieldIndexes { + colID, fieldID, err := loader.fieldsStatsKey2IDs(fieldStatsKey) + if err != nil { + return err + } + fieldStats := internalpb2.FieldStats{ + CollectionID: colID, + FieldID: fieldID, + IndexStats: indexStats, + } + resultFieldsStats = append(resultFieldsStats, &fieldStats) + } + + loader.fieldStatsChan <- resultFieldsStats + fmt.Println("sent field stats") + return nil +} + +func (loader *indexLoader) checkIndexReady(indexParams indexParam, l *loadIndex) (bool, error) { + segment, err := loader.replica.getSegmentByID(l.segmentID) + if err != nil { + return false, err + } + if !segment.matchIndexParam(l.fieldID, indexParams) { + return false, nil + } + return true, nil +} + +func (loader *indexLoader) getIndexInfo(collectionID UniqueID, segmentID UniqueID) (UniqueID, UniqueID, error) { + req := &milvuspb.DescribeSegmentRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kDescribeSegment, + }, + CollectionID: collectionID, + SegmentID: segmentID, + } + response, err := loader.masterClient.DescribeSegment(req) + if err != nil { + return 0, 0, err + } + return response.IndexID, response.BuildID, nil +} + +func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error) { + if loader.indexClient == nil { + return nil, errors.New("null index service client") + } + + indexFilePathRequest := &indexpb.IndexFilePathsRequest{ + // TODO: rename indexIDs to buildIDs + IndexBuildIDs: []UniqueID{indexBuildID}, + } + pathResponse, err := loader.indexClient.GetIndexFilePaths(indexFilePathRequest) + if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return nil, err + } + + if len(pathResponse.FilePaths) <= 0 { + return nil, errors.New("illegal index file paths") + } + + return pathResponse.FilePaths[0].IndexFilePaths, nil +} + +func (loader *indexLoader) loadIndexImmediate(segment *Segment, indexPaths []string) error { + // get vector field ids from schema to load index + vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(segment.collectionID) + if err != nil { + return err + } + for _, id := range vecFieldIDs { + l := &loadIndex{ + segmentID: segment.ID(), + fieldID: id, + indexPaths: indexPaths, + } + + err = loader.execute(l) + if err != nil { + return err + } + } + return nil +} + +func (loader *indexLoader) loadIndexDelayed(collectionID, segmentID UniqueID, indexPaths []string) error { + // get vector field ids from schema to load index + vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(collectionID) + if err != nil { + return err + } + for _, id := range vecFieldIDs { + l := &loadIndex{ + segmentID: segmentID, + fieldID: id, + indexPaths: indexPaths, + } + + err = loader.execute(l) + if err != nil { + return err + } + } + + return nil +} + +func newIndexLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, replica collectionReplica) *indexLoader { + option := &minioKV.Option{ + Address: Params.MinioEndPoint, + AccessKeyID: Params.MinioAccessKeyID, + SecretAccessKeyID: Params.MinioSecretAccessKey, + UseSSL: Params.MinioUseSSLStr, + CreateBucket: true, + BucketName: Params.MinioBucketName, + } + + client, err := minioKV.NewMinIOKV(ctx, option) + if err != nil { + panic(err) + } + + return &indexLoader{ + replica: replica, + + fieldIndexes: make(map[string][]*internalpb2.IndexStats), + fieldStatsChan: make(chan []*internalpb2.FieldStats, 1), + + masterClient: masterClient, + indexClient: indexClient, + + kv: client, + } +} diff --git a/internal/querynode/load_service.go b/internal/querynode/load_service.go index 076bd57cbd6188be064c96094bb3d9fd3cd0d440..1f3268399aef4c8218e1b3112e096838a12da7f4 100644 --- a/internal/querynode/load_service.go +++ b/internal/querynode/load_service.go @@ -2,25 +2,12 @@ package querynode import ( "context" - "errors" "fmt" "log" - "path" - "sort" - "strconv" - "strings" + "sync" "time" - "github.com/zilliztech/milvus-distributed/internal/kv" - minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/msgstream/util" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/datapb" - "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" - "github.com/zilliztech/milvus-distributed/internal/storage" ) const indexCheckInterval = 1 @@ -29,622 +16,126 @@ type loadService struct { ctx context.Context cancel context.CancelFunc - replica collectionReplica - - fieldIndexes map[string][]*internalpb2.IndexStats - fieldStatsChan chan []*internalpb2.FieldStats - - dmStream msgstream.MsgStream - - masterClient MasterServiceInterface - dataClient DataServiceInterface - indexClient IndexServiceInterface - - kv kv.Base // minio kv - iCodec *storage.InsertCodec -} - -type loadIndex struct { - segmentID UniqueID - fieldID int64 - indexPaths []string + segLoader *segmentLoader } // -------------------------------------------- load index -------------------------------------------- // func (s *loadService) start() { + wg := &sync.WaitGroup{} for { select { case <-s.ctx.Done(): return case <-time.After(indexCheckInterval * time.Second): - collectionIDs, segmentIDs := s.replica.getSealedSegments() - if len(collectionIDs) <= 0 { - continue - } - fmt.Println("do load index for segments:", segmentIDs) - for i := range collectionIDs { - // we don't need index id yet - _, buildID, err := s.getIndexInfo(collectionIDs[i], segmentIDs[i]) - if err != nil { - indexPaths, err := s.getIndexPaths(buildID) - if err != nil { - log.Println(err) - continue - } - err = s.loadIndexDelayed(collectionIDs[i], segmentIDs[i], indexPaths) - if err != nil { - log.Println(err) - continue - } - } - } - // sendQueryNodeStats - err := s.sendQueryNodeStats() - if err != nil { - log.Println(err) - continue - } - } - } -} - -func (s *loadService) execute(l *loadIndex) error { - // 1. use msg's index paths to get index bytes - var err error - var indexBuffer [][]byte - var indexParams indexParam - var indexName string - var indexID UniqueID - fn := func() error { - indexBuffer, indexParams, indexName, indexID, err = s.loadIndex(l.indexPaths) - if err != nil { - return err + wg.Add(2) + go s.segLoader.indexLoader.doLoadIndex(wg) + go s.loadSegmentActively(wg) + wg.Wait() } - return nil - } - err = util.Retry(5, time.Millisecond*200, fn) - if err != nil { - return err - } - ok, err := s.checkIndexReady(indexParams, l) - if err != nil { - return err - } - if ok { - // no error - return errors.New("") - } - // 2. use index bytes and index path to update segment - err = s.updateSegmentIndex(indexParams, indexBuffer, l) - if err != nil { - return err - } - // 3. update segment index stats - err = s.updateSegmentIndexStats(indexParams, indexName, indexID, l) - if err != nil { - return err } - fmt.Println("load index done") - return nil } func (s *loadService) close() { s.cancel() } -func (s *loadService) printIndexParams(index []*commonpb.KeyValuePair) { - fmt.Println("=================================================") - for i := 0; i < len(index); i++ { - fmt.Println(index[i]) - } -} - -func (s *loadService) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool { - if len(index1) != len(index2) { - return false - } - - for i := 0; i < len(index1); i++ { - kv1 := *index1[i] - kv2 := *index2[i] - if kv1.Key != kv2.Key || kv1.Value != kv2.Value { - return false - } - } - - return true -} - -func (s *loadService) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string { - return strconv.FormatInt(collectionID, 10) + "/" + strconv.FormatInt(fieldID, 10) -} - -func (s *loadService) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) { - ids := strings.Split(key, "/") - if len(ids) != 2 { - return 0, 0, errors.New("illegal fieldsStatsKey") - } - collectionID, err := strconv.ParseInt(ids[0], 10, 64) - if err != nil { - return 0, 0, err - } - fieldID, err := strconv.ParseInt(ids[1], 10, 64) - if err != nil { - return 0, 0, err +func (s *loadService) loadSegmentActively(wg *sync.WaitGroup) { + collectionIDs, partitionIDs, segmentIDs := s.segLoader.replica.getEnabledSealedSegmentsBySegmentType(segTypeGrowing) + if len(collectionIDs) <= 0 { + return } - return collectionID, fieldID, nil -} - -func (s *loadService) updateSegmentIndexStats(indexParams indexParam, indexName string, indexID UniqueID, l *loadIndex) error { - targetSegment, err := s.replica.getSegmentByID(l.segmentID) - if err != nil { - return err - } - - fieldStatsKey := s.fieldsStatsIDs2Key(targetSegment.collectionID, l.fieldID) - _, ok := s.fieldIndexes[fieldStatsKey] - newIndexParams := make([]*commonpb.KeyValuePair, 0) - for k, v := range indexParams { - newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{ - Key: k, - Value: v, - }) - } - - // sort index params by key - sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key }) - if !ok { - s.fieldIndexes[fieldStatsKey] = make([]*internalpb2.IndexStats, 0) - s.fieldIndexes[fieldStatsKey] = append(s.fieldIndexes[fieldStatsKey], - &internalpb2.IndexStats{ - IndexParams: newIndexParams, - NumRelatedSegments: 1, - }) - } else { - isNewIndex := true - for _, index := range s.fieldIndexes[fieldStatsKey] { - if s.indexParamsEqual(newIndexParams, index.IndexParams) { - index.NumRelatedSegments++ - isNewIndex = false - } - } - if isNewIndex { - s.fieldIndexes[fieldStatsKey] = append(s.fieldIndexes[fieldStatsKey], - &internalpb2.IndexStats{ - IndexParams: newIndexParams, - NumRelatedSegments: 1, - }) - } - } - err = targetSegment.setIndexParam(l.fieldID, newIndexParams) - if err != nil { - return err - } - targetSegment.setIndexName(indexName) - targetSegment.setIndexID(indexID) - - return nil -} - -func (s *loadService) loadIndex(indexPath []string) ([][]byte, indexParam, string, UniqueID, error) { - index := make([][]byte, 0) - - var indexParams indexParam - var indexName string - var indexID UniqueID - for _, p := range indexPath { - fmt.Println("load path = ", indexPath) - indexPiece, err := s.kv.Load(p) + fmt.Println("do load segment for growing segments:", segmentIDs) + for i := range collectionIDs { + fieldIDs, err := s.segLoader.replica.getFieldIDsByCollectionID(collectionIDs[i]) if err != nil { - return nil, nil, "", -1, err - } - // get index params when detecting indexParamPrefix - if path.Base(p) == storage.IndexParamsFile { - indexCodec := storage.NewIndexCodec() - _, indexParams, indexName, indexID, err = indexCodec.Deserialize([]*storage.Blob{ - { - Key: storage.IndexParamsFile, - Value: []byte(indexPiece), - }, - }) - if err != nil { - return nil, nil, "", -1, err - } - } else { - index = append(index, []byte(indexPiece)) + log.Println(err) + continue } - } - - if len(indexParams) <= 0 { - return nil, nil, "", -1, errors.New("cannot find index param") - } - return index, indexParams, indexName, indexID, nil -} - -func (s *loadService) updateSegmentIndex(indexParams indexParam, bytesIndex [][]byte, l *loadIndex) error { - segment, err := s.replica.getSegmentByID(l.segmentID) - if err != nil { - return err - } - - loadIndexInfo, err := newLoadIndexInfo() - defer deleteLoadIndexInfo(loadIndexInfo) - if err != nil { - return err - } - err = loadIndexInfo.appendFieldInfo(l.fieldID) - if err != nil { - return err - } - for k, v := range indexParams { - err = loadIndexInfo.appendIndexParam(k, v) + err = s.loadSegmentInternal(collectionIDs[i], partitionIDs[i], segmentIDs[i], fieldIDs) if err != nil { - return err + log.Println(err) } } - err = loadIndexInfo.appendIndex(bytesIndex, l.indexPaths) + // sendQueryNodeStats + err := s.segLoader.indexLoader.sendQueryNodeStats() if err != nil { - return err + log.Println(err) + wg.Done() + return } - return segment.updateSegmentIndex(loadIndexInfo) -} -func (s *loadService) sendQueryNodeStats() error { - resultFieldsStats := make([]*internalpb2.FieldStats, 0) - for fieldStatsKey, indexStats := range s.fieldIndexes { - colID, fieldID, err := s.fieldsStatsKey2IDs(fieldStatsKey) - if err != nil { - return err - } - fieldStats := internalpb2.FieldStats{ - CollectionID: colID, - FieldID: fieldID, - IndexStats: indexStats, - } - resultFieldsStats = append(resultFieldsStats, &fieldStats) - } - - s.fieldStatsChan <- resultFieldsStats - fmt.Println("sent field stats") - return nil + wg.Done() } -func (s *loadService) checkIndexReady(indexParams indexParam, l *loadIndex) (bool, error) { - segment, err := s.replica.getSegmentByID(l.segmentID) - if err != nil { - return false, err - } - if !segment.matchIndexParam(l.fieldID, indexParams) { - return false, nil - } - return true, nil -} - -func (s *loadService) getIndexInfo(collectionID UniqueID, segmentID UniqueID) (UniqueID, UniqueID, error) { - req := &milvuspb.DescribeSegmentRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kDescribeSegment, - }, - CollectionID: collectionID, - SegmentID: segmentID, - } - response, err := s.masterClient.DescribeSegment(req) - if err != nil { - return 0, 0, err - } - return response.IndexID, response.BuildID, nil -} - -// -------------------------------------------- load segment -------------------------------------------- // +// load segment passively func (s *loadService) loadSegment(collectionID UniqueID, partitionID UniqueID, segmentIDs []UniqueID, fieldIDs []int64) error { // TODO: interim solution if len(fieldIDs) == 0 { - collection, err := s.replica.getCollectionByID(collectionID) + var err error + fieldIDs, err = s.segLoader.replica.getFieldIDsByCollectionID(collectionID) if err != nil { return err } - fieldIDs = make([]int64, 0) - for _, field := range collection.Schema().Fields { - fieldIDs = append(fieldIDs, field.FieldID) - } } for _, segmentID := range segmentIDs { - // we don't need index id yet - _, buildID, errIndex := s.getIndexInfo(collectionID, segmentID) - if errIndex == nil { - // we don't need load to vector fields - vectorFields, err := s.replica.getVecFieldsByCollectionID(segmentID) - if err != nil { - return err - } - fieldIDs = s.filterOutVectorFields(fieldIDs, vectorFields) - } - paths, srcFieldIDs, err := s.getInsertBinlogPaths(segmentID) - if err != nil { - return err - } - - targetFields := s.getTargetFields(paths, srcFieldIDs, fieldIDs) - collection, err := s.replica.getCollectionByID(collectionID) - if err != nil { - return err - } - segment := newSegment(collection, segmentID, partitionID, collectionID, segTypeSealed) - err = s.loadSegmentFieldsData(segment, targetFields) + err := s.loadSegmentInternal(collectionID, partitionID, segmentID, fieldIDs) if err != nil { - return err - } - if errIndex == nil { - indexPaths, err := s.getIndexPaths(buildID) - if err != nil { - return err - } - err = s.loadIndexImmediate(segment, indexPaths) - if err != nil { - // TODO: return or continue? - return err - } + log.Println(err) + continue } } return nil } -func (s *loadService) releaseSegment(segmentID UniqueID) error { - err := s.replica.removeSegment(segmentID) - return err -} - -func (s *loadService) seekSegment(position *internalpb2.MsgPosition) error { - // TODO: open seek - //for _, position := range positions { - // err := s.dmStream.Seek(position) - // if err != nil { - // return err - // } - //} - return nil -} - -func (s *loadService) getIndexPaths(indexBuildID UniqueID) ([]string, error) { - if s.indexClient == nil { - return nil, errors.New("null index service client") - } - - indexFilePathRequest := &indexpb.IndexFilePathsRequest{ - // TODO: rename indexIDs to buildIDs - IndexBuildIDs: []UniqueID{indexBuildID}, - } - pathResponse, err := s.indexClient.GetIndexFilePaths(indexFilePathRequest) - if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - return nil, err - } - - if len(pathResponse.FilePaths) <= 0 { - return nil, errors.New("illegal index file paths") - } - - return pathResponse.FilePaths[0].IndexFilePaths, nil -} - -func (s *loadService) loadIndexImmediate(segment *Segment, indexPaths []string) error { - // get vector field ids from schema to load index - vecFieldIDs, err := s.replica.getVecFieldsByCollectionID(segment.collectionID) - if err != nil { - return err - } - for _, id := range vecFieldIDs { - l := &loadIndex{ - segmentID: segment.ID(), - fieldID: id, - indexPaths: indexPaths, - } - - err = s.execute(l) - if err != nil { - return err - } - // replace segment - err = s.replica.replaceGrowingSegmentBySealedSegment(segment) +func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, fieldIDs []int64) error { + // we don't need index id yet + _, buildID, errIndex := s.segLoader.indexLoader.getIndexInfo(collectionID, segmentID) + if errIndex == nil { + // we don't need load to vector fields + vectorFields, err := s.segLoader.replica.getVecFieldIDsByCollectionID(segmentID) if err != nil { return err } + fieldIDs = s.segLoader.filterOutVectorFields(fieldIDs, vectorFields) } - return nil -} - -func (s *loadService) loadIndexDelayed(collectionID, segmentID UniqueID, indexPaths []string) error { - // get vector field ids from schema to load index - vecFieldIDs, err := s.replica.getVecFieldsByCollectionID(collectionID) + paths, srcFieldIDs, err := s.segLoader.getInsertBinlogPaths(segmentID) if err != nil { return err } - for _, id := range vecFieldIDs { - l := &loadIndex{ - segmentID: segmentID, - fieldID: id, - indexPaths: indexPaths, - } - - err = s.execute(l) - if err != nil { - return err - } - } - - return nil -} - -func (s *loadService) getInsertBinlogPaths(segmentID UniqueID) ([]*internalpb2.StringList, []int64, error) { - if s.dataClient == nil { - return nil, nil, errors.New("null data service client") - } - - insertBinlogPathRequest := &datapb.InsertBinlogPathRequest{ - SegmentID: segmentID, - } - pathResponse, err := s.dataClient.GetInsertBinlogPaths(insertBinlogPathRequest) + targetFields := s.segLoader.getTargetFields(paths, srcFieldIDs, fieldIDs) + collection, err := s.segLoader.replica.getCollectionByID(collectionID) if err != nil { - return nil, nil, err - } - - if len(pathResponse.FieldIDs) != len(pathResponse.Paths) { - return nil, nil, errors.New("illegal InsertBinlogPathsResponse") - } - - return pathResponse.Paths, pathResponse.FieldIDs, nil -} - -func (s *loadService) filterOutVectorFields(fieldIDs []int64, vectorFields []int64) []int64 { - containsFunc := func(s []int64, e int64) bool { - for _, a := range s { - if a == e { - return true - } - } - return false - } - targetFields := make([]int64, 0) - for _, id := range fieldIDs { - if !containsFunc(vectorFields, id) { - targetFields = append(targetFields, id) - } - } - return targetFields -} - -func (s *loadService) getTargetFields(paths []*internalpb2.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalpb2.StringList { - targetFields := make(map[int64]*internalpb2.StringList) - - containsFunc := func(s []int64, e int64) bool { - for _, a := range s { - if a == e { - return true - } - } - return false + return err } - - for i, fieldID := range srcFieldIDS { - if containsFunc(dstFields, fieldID) { - targetFields[fieldID] = paths[i] - } + segment := newSegment(collection, segmentID, partitionID, collectionID, segTypeSealed) + err = s.segLoader.loadSegmentFieldsData(segment, targetFields) + if err != nil { + return err } - - return targetFields -} - -func (s *loadService) loadSegmentFieldsData(segment *Segment, targetFields map[int64]*internalpb2.StringList) error { - for id, p := range targetFields { - if id == timestampFieldID { - // seg core doesn't need timestamp field - continue - } - - paths := p.Values - blobs := make([]*storage.Blob, 0) - for _, path := range paths { - binLog, err := s.kv.Load(path) - if err != nil { - // TODO: return or continue? - return err - } - blobs = append(blobs, &storage.Blob{ - Key: strconv.FormatInt(id, 10), // TODO: key??? - Value: []byte(binLog), - }) - } - _, _, insertData, err := s.iCodec.Deserialize(blobs) + if errIndex == nil { + indexPaths, err := s.segLoader.indexLoader.getIndexPaths(buildID) if err != nil { - // TODO: return or continue return err } - if len(insertData.Data) != 1 { - return errors.New("we expect only one field in deserialized insert data") - } - - for _, value := range insertData.Data { - var numRows int - var data interface{} - - switch fieldData := value.(type) { - case *storage.BoolFieldData: - numRows = fieldData.NumRows - data = fieldData.Data - case *storage.Int8FieldData: - numRows = fieldData.NumRows - data = fieldData.Data - case *storage.Int16FieldData: - numRows = fieldData.NumRows - data = fieldData.Data - case *storage.Int32FieldData: - numRows = fieldData.NumRows - data = fieldData.Data - case *storage.Int64FieldData: - numRows = fieldData.NumRows - data = fieldData.Data - case *storage.FloatFieldData: - numRows = fieldData.NumRows - data = fieldData.Data - case *storage.DoubleFieldData: - numRows = fieldData.NumRows - data = fieldData.Data - case storage.StringFieldData: - numRows = fieldData.NumRows - data = fieldData.Data - case *storage.FloatVectorFieldData: - numRows = fieldData.NumRows - data = fieldData.Data - case *storage.BinaryVectorFieldData: - numRows = fieldData.NumRows - data = fieldData.Data - default: - return errors.New("unexpected field data type") - } - err = segment.segmentLoadFieldData(id, numRows, data) - if err != nil { - // TODO: return or continue? - return err - } + err = s.segLoader.indexLoader.loadIndexImmediate(segment, indexPaths) + if err != nil { + return err } } - - return nil + // replace segment + return s.segLoader.replica.replaceGrowingSegmentBySealedSegment(segment) } func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *loadService { ctx1, cancel := context.WithCancel(ctx) - option := &minioKV.Option{ - Address: Params.MinioEndPoint, - AccessKeyID: Params.MinioAccessKeyID, - SecretAccessKeyID: Params.MinioSecretAccessKey, - UseSSL: Params.MinioUseSSLStr, - CreateBucket: true, - BucketName: Params.MinioBucketName, - } - - client, err := minioKV.NewMinIOKV(ctx1, option) - if err != nil { - panic(err) - } + segLoader := newSegmentLoader(ctx1, masterClient, indexClient, dataClient, replica, dmStream) return &loadService{ ctx: ctx1, cancel: cancel, - replica: replica, - - fieldIndexes: make(map[string][]*internalpb2.IndexStats), - fieldStatsChan: make(chan []*internalpb2.FieldStats, 1), - - dmStream: dmStream, - - masterClient: masterClient, - dataClient: dataClient, - indexClient: indexClient, - - kv: client, - iCodec: &storage.InsertCodec{}, + segLoader: segLoader, } } diff --git a/internal/querynode/load_service_test.go b/internal/querynode/load_service_test.go index 4bcd100522618aa64a7ac12c0e570c62f979c303..247c76b3c4a72bec20c18d56341d82f67e9c4c55 100644 --- a/internal/querynode/load_service_test.go +++ b/internal/querynode/load_service_test.go @@ -1126,19 +1126,19 @@ func TestSegmentLoad_Search_Vector(t *testing.T) { paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix) assert.NoError(t, err) - fieldsMap := node.loadService.getTargetFields(paths, srcFieldIDs, fieldIDs) + fieldsMap := node.loadService.segLoader.getTargetFields(paths, srcFieldIDs, fieldIDs) assert.Equal(t, len(fieldsMap), 2) segment, err := node.replica.getSegmentByID(segmentID) assert.NoError(t, err) - err = node.loadService.loadSegmentFieldsData(segment, fieldsMap) + err = node.loadService.segLoader.loadSegmentFieldsData(segment, fieldsMap) assert.NoError(t, err) indexPaths, err := generateIndex(segmentID) assert.NoError(t, err) - err = node.loadService.loadIndexImmediate(segment, indexPaths) + err = node.loadService.segLoader.indexLoader.loadIndexImmediate(segment, indexPaths) assert.NoError(t, err) // do search diff --git a/internal/querynode/partition.go b/internal/querynode/partition.go index ee6abe3ba2e9f580474d78db7ce5caa31d3bd1a8..787d6fab1c443e8de4f13e5581b8abae5dbe8d8b 100644 --- a/internal/querynode/partition.go +++ b/internal/querynode/partition.go @@ -16,7 +16,7 @@ type Partition struct { collectionID UniqueID partitionID UniqueID segmentIDs []UniqueID - enableDM bool + enable bool } func (p *Partition) ID() UniqueID { @@ -41,7 +41,7 @@ func newPartition(collectionID UniqueID, partitionID UniqueID) *Partition { var newPartition = &Partition{ collectionID: collectionID, partitionID: partitionID, - enableDM: false, + enable: false, } return newPartition diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 3bf6c5abcb073318d54b92fdc67a09a3c219fa72..dedf615a7052af2dab9c9c625230388a230d9b64 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -148,7 +148,7 @@ func (node *QueryNode) Start() error { node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica) node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream) - node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.fieldStatsChan) + node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan) // start services go node.dataSyncService.start() @@ -382,7 +382,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S segmentIDs := in.SegmentIDs fieldIDs := in.FieldIDs - err := node.replica.enablePartitionDM(partitionID) + err := node.replica.enablePartition(partitionID) if err != nil { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -395,7 +395,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S for i, state := range in.SegmentStates { if state.State == commonpb.SegmentState_SegmentGrowing { position := state.StartPosition - err = node.loadService.seekSegment(position) + err = node.loadService.segLoader.seekSegment(position) if err != nil { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -423,7 +423,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) { for _, id := range in.PartitionIDs { - err := node.replica.enablePartitionDM(id) + err := node.replica.enablePartition(id) if err != nil { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -435,7 +435,7 @@ func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*comm // release all fields in the segments for _, id := range in.SegmentIDs { - err := node.loadService.releaseSegment(id) + err := node.loadService.segLoader.releaseSegment(id) if err != nil { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go new file mode 100644 index 0000000000000000000000000000000000000000..883c96e1758d1e6cf57bcda804adce5a70589c7e --- /dev/null +++ b/internal/querynode/segment_loader.go @@ -0,0 +1,211 @@ +package querynode + +import ( + "context" + "errors" + "strconv" + + "github.com/zilliztech/milvus-distributed/internal/kv" + minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio" + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/storage" +) + +// segmentLoader is only responsible for loading the field data from binlog +type segmentLoader struct { + replica collectionReplica + + dmStream msgstream.MsgStream + + dataClient DataServiceInterface + + kv kv.Base // minio kv + iCodec *storage.InsertCodec + + indexLoader *indexLoader +} + +func (loader *segmentLoader) releaseSegment(segmentID UniqueID) error { + err := loader.replica.removeSegment(segmentID) + return err +} + +func (loader *segmentLoader) seekSegment(position *internalpb2.MsgPosition) error { + // TODO: open seek + //for _, position := range positions { + // err := s.dmStream.Seek(position) + // if err != nil { + // return err + // } + //} + return nil +} + +func (loader *segmentLoader) getInsertBinlogPaths(segmentID UniqueID) ([]*internalpb2.StringList, []int64, error) { + if loader.dataClient == nil { + return nil, nil, errors.New("null data service client") + } + + insertBinlogPathRequest := &datapb.InsertBinlogPathRequest{ + SegmentID: segmentID, + } + + pathResponse, err := loader.dataClient.GetInsertBinlogPaths(insertBinlogPathRequest) + if err != nil { + return nil, nil, err + } + + if len(pathResponse.FieldIDs) != len(pathResponse.Paths) { + return nil, nil, errors.New("illegal InsertBinlogPathsResponse") + } + + return pathResponse.Paths, pathResponse.FieldIDs, nil +} + +func (loader *segmentLoader) filterOutVectorFields(fieldIDs []int64, vectorFields []int64) []int64 { + containsFunc := func(s []int64, e int64) bool { + for _, a := range s { + if a == e { + return true + } + } + return false + } + targetFields := make([]int64, 0) + for _, id := range fieldIDs { + if !containsFunc(vectorFields, id) { + targetFields = append(targetFields, id) + } + } + return targetFields +} + +func (loader *segmentLoader) getTargetFields(paths []*internalpb2.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalpb2.StringList { + targetFields := make(map[int64]*internalpb2.StringList) + + containsFunc := func(s []int64, e int64) bool { + for _, a := range s { + if a == e { + return true + } + } + return false + } + + for i, fieldID := range srcFieldIDS { + if containsFunc(dstFields, fieldID) { + targetFields[fieldID] = paths[i] + } + } + + return targetFields +} + +func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, targetFields map[int64]*internalpb2.StringList) error { + for id, p := range targetFields { + if id == timestampFieldID { + // seg core doesn't need timestamp field + continue + } + + paths := p.Values + blobs := make([]*storage.Blob, 0) + for _, path := range paths { + binLog, err := loader.kv.Load(path) + if err != nil { + // TODO: return or continue? + return err + } + blobs = append(blobs, &storage.Blob{ + Key: strconv.FormatInt(id, 10), // TODO: key??? + Value: []byte(binLog), + }) + } + _, _, insertData, err := loader.iCodec.Deserialize(blobs) + if err != nil { + // TODO: return or continue + return err + } + if len(insertData.Data) != 1 { + return errors.New("we expect only one field in deserialized insert data") + } + + for _, value := range insertData.Data { + var numRows int + var data interface{} + + switch fieldData := value.(type) { + case *storage.BoolFieldData: + numRows = fieldData.NumRows + data = fieldData.Data + case *storage.Int8FieldData: + numRows = fieldData.NumRows + data = fieldData.Data + case *storage.Int16FieldData: + numRows = fieldData.NumRows + data = fieldData.Data + case *storage.Int32FieldData: + numRows = fieldData.NumRows + data = fieldData.Data + case *storage.Int64FieldData: + numRows = fieldData.NumRows + data = fieldData.Data + case *storage.FloatFieldData: + numRows = fieldData.NumRows + data = fieldData.Data + case *storage.DoubleFieldData: + numRows = fieldData.NumRows + data = fieldData.Data + case storage.StringFieldData: + numRows = fieldData.NumRows + data = fieldData.Data + case *storage.FloatVectorFieldData: + numRows = fieldData.NumRows + data = fieldData.Data + case *storage.BinaryVectorFieldData: + numRows = fieldData.NumRows + data = fieldData.Data + default: + return errors.New("unexpected field data type") + } + err = segment.segmentLoadFieldData(id, numRows, data) + if err != nil { + // TODO: return or continue? + return err + } + } + } + return nil +} + +func newSegmentLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, dataClient DataServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *segmentLoader { + option := &minioKV.Option{ + Address: Params.MinioEndPoint, + AccessKeyID: Params.MinioAccessKeyID, + SecretAccessKeyID: Params.MinioSecretAccessKey, + UseSSL: Params.MinioUseSSLStr, + CreateBucket: true, + BucketName: Params.MinioBucketName, + } + + client, err := minioKV.NewMinIOKV(ctx, option) + if err != nil { + panic(err) + } + + iLoader := newIndexLoader(ctx, masterClient, indexClient, replica) + return &segmentLoader{ + replica: replica, + + dmStream: dmStream, + + dataClient: dataClient, + + kv: client, + iCodec: &storage.InsertCodec{}, + + indexLoader: iLoader, + } +}