提交 e9ee9a27 编写于 作者: B bigsheeper 提交者: yefu.chen

Refactor load service and check insertion binLog periodically

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 0111cba8
......@@ -30,6 +30,7 @@ import (
*/
type collectionReplica interface {
// collection
getCollectionIDs() []UniqueID
addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
removeCollection(collectionID UniqueID) error
getCollectionByID(collectionID UniqueID) (*Collection, error)
......@@ -37,7 +38,8 @@ type collectionReplica interface {
getCollectionNum() int
getPartitionIDs(collectionID UniqueID) ([]UniqueID, error)
getVecFieldsByCollectionID(collectionID UniqueID) ([]int64, error)
getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error)
getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error)
// partition
addPartition(collectionID UniqueID, partitionID UniqueID) error
......@@ -47,9 +49,8 @@ type collectionReplica interface {
getPartitionNum() int
getSegmentIDs(partitionID UniqueID) ([]UniqueID, error)
enablePartitionDM(partitionID UniqueID) error
disablePartitionDM(partitionID UniqueID) error
getEnablePartitionDM(partitionID UniqueID) (bool, error)
enablePartition(partitionID UniqueID) error
disablePartition(partitionID UniqueID) error
// segment
addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error
......@@ -59,7 +60,7 @@ type collectionReplica interface {
getSegmentNum() int
getSegmentStatistics() []*internalpb2.SegmentStats
getSealedSegments() ([]UniqueID, []UniqueID)
getEnabledSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
replaceGrowingSegmentBySealedSegment(segment *Segment) error
getTSafe() tSafe
......@@ -76,6 +77,16 @@ type collectionReplicaImpl struct {
}
//----------------------------------------------------------------------------------------------------- collection
func (colReplica *collectionReplicaImpl) getCollectionIDs() []UniqueID {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
collectionIDs := make([]UniqueID, 0)
for id := range colReplica.collections {
collectionIDs = append(collectionIDs, id)
}
return collectionIDs
}
func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
......@@ -158,29 +169,59 @@ func (colReplica *collectionReplicaImpl) getPartitionIDs(collectionID UniqueID)
return collection.partitionIDs, nil
}
func (colReplica *collectionReplicaImpl) getVecFieldsByCollectionID(collectionID UniqueID) ([]int64, error) {
func (colReplica *collectionReplicaImpl) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
collection, err := colReplica.getCollectionByIDPrivate(collectionID)
fields, err := colReplica.getFieldsByCollectionIDPrivate(collectionID)
if err != nil {
return nil, err
}
vecFields := make([]int64, 0)
for _, field := range collection.Schema().Fields {
for _, field := range fields {
if field.DataType == schemapb.DataType_VECTOR_BINARY || field.DataType == schemapb.DataType_VECTOR_FLOAT {
vecFields = append(vecFields, field.FieldID)
}
}
if len(vecFields) <= 0 {
return nil, errors.New("no vector field in segment " + strconv.FormatInt(collectionID, 10))
return nil, errors.New("no vector field in collection " + strconv.FormatInt(collectionID, 10))
}
return vecFields, nil
}
func (colReplica *collectionReplicaImpl) getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
fields, err := colReplica.getFieldsByCollectionIDPrivate(collectionID)
if err != nil {
return nil, err
}
targetFields := make([]int64, 0)
for _, field := range fields {
targetFields = append(targetFields, field.FieldID)
}
return targetFields, nil
}
func (colReplica *collectionReplicaImpl) getFieldsByCollectionIDPrivate(collectionID UniqueID) ([]*schemapb.FieldSchema, error) {
collection, err := colReplica.getCollectionByIDPrivate(collectionID)
if err != nil {
return nil, err
}
if len(collection.Schema().Fields) <= 0 {
return nil, errors.New("no field in collection " + strconv.FormatInt(collectionID, 10))
}
return collection.Schema().Fields, nil
}
//----------------------------------------------------------------------------------------------------- partition
func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionID UniqueID) error {
colReplica.mu.Lock()
......@@ -263,7 +304,10 @@ func (colReplica *collectionReplicaImpl) getPartitionNum() int {
func (colReplica *collectionReplicaImpl) getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return colReplica.getSegmentIDsPrivate(partitionID)
}
func (colReplica *collectionReplicaImpl) getSegmentIDsPrivate(partitionID UniqueID) ([]UniqueID, error) {
partition, err2 := colReplica.getPartitionByIDPrivate(partitionID)
if err2 != nil {
return nil, err2
......@@ -271,7 +315,7 @@ func (colReplica *collectionReplicaImpl) getSegmentIDs(partitionID UniqueID) ([]
return partition.segmentIDs, nil
}
func (colReplica *collectionReplicaImpl) enablePartitionDM(partitionID UniqueID) error {
func (colReplica *collectionReplicaImpl) enablePartition(partitionID UniqueID) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
......@@ -280,11 +324,11 @@ func (colReplica *collectionReplicaImpl) enablePartitionDM(partitionID UniqueID)
return err
}
partition.enableDM = true
partition.enable = true
return nil
}
func (colReplica *collectionReplicaImpl) disablePartitionDM(partitionID UniqueID) error {
func (colReplica *collectionReplicaImpl) disablePartition(partitionID UniqueID) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
......@@ -293,19 +337,18 @@ func (colReplica *collectionReplicaImpl) disablePartitionDM(partitionID UniqueID
return err
}
partition.enableDM = false
partition.enable = false
return nil
}
func (colReplica *collectionReplicaImpl) getEnablePartitionDM(partitionID UniqueID) (bool, error) {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
partition, err := colReplica.getPartitionByIDPrivate(partitionID)
if err != nil {
return false, err
func (colReplica *collectionReplicaImpl) getEnabledPartitionIDsPrivate() []UniqueID {
partitionIDs := make([]UniqueID, 0)
for _, partition := range colReplica.partitions {
if partition.enable {
partitionIDs = append(partitionIDs, partition.partitionID)
}
}
return partition.enableDM, nil
return partitionIDs
}
//----------------------------------------------------------------------------------------------------- segment
......@@ -414,20 +457,33 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.S
return statisticData
}
func (colReplica *collectionReplicaImpl) getSealedSegments() ([]UniqueID, []UniqueID) {
func (colReplica *collectionReplicaImpl) getEnabledSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
collectionIDs := make([]UniqueID, 0)
segmentIDs := make([]UniqueID, 0)
for k, v := range colReplica.segments {
if v.getType() == segTypeSealed {
collectionIDs = append(collectionIDs, v.collectionID)
segmentIDs = append(segmentIDs, k)
targetCollectionIDs := make([]UniqueID, 0)
targetPartitionIDs := make([]UniqueID, 0)
targetSegmentIDs := make([]UniqueID, 0)
for _, partitionID := range colReplica.getEnabledPartitionIDsPrivate() {
segmentIDs, err := colReplica.getSegmentIDsPrivate(partitionID)
if err != nil {
continue
}
for _, segmentID := range segmentIDs {
segment, err := colReplica.getSegmentByIDPrivate(segmentID)
if err != nil {
continue
}
if segment.getType() == segType {
targetCollectionIDs = append(targetCollectionIDs, segment.collectionID)
targetPartitionIDs = append(targetPartitionIDs, segment.collectionID)
targetSegmentIDs = append(targetSegmentIDs, segment.segmentID)
}
}
}
return collectionIDs, segmentIDs
return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs
}
func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(segment *Segment) error {
......
package querynode
import (
"context"
"errors"
"fmt"
"log"
"path"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/kv"
minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/storage"
)
type indexLoader struct {
replica collectionReplica
fieldIndexes map[string][]*internalpb2.IndexStats
fieldStatsChan chan []*internalpb2.FieldStats
masterClient MasterServiceInterface
indexClient IndexServiceInterface
kv kv.Base // minio kv
}
type loadIndex struct {
segmentID UniqueID
fieldID int64
indexPaths []string
}
func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) {
collectionIDs, _, segmentIDs := loader.replica.getEnabledSealedSegmentsBySegmentType(segTypeSealed)
if len(collectionIDs) <= 0 {
return
}
fmt.Println("do load index for sealed segments:", segmentIDs)
for i := range collectionIDs {
// we don't need index id yet
_, buildID, err := loader.getIndexInfo(collectionIDs[i], segmentIDs[i])
if err != nil {
indexPaths, err := loader.getIndexPaths(buildID)
if err != nil {
log.Println(err)
continue
}
err = loader.loadIndexDelayed(collectionIDs[i], segmentIDs[i], indexPaths)
if err != nil {
log.Println(err)
continue
}
}
}
// sendQueryNodeStats
err := loader.sendQueryNodeStats()
if err != nil {
log.Println(err)
wg.Done()
return
}
wg.Done()
}
func (loader *indexLoader) execute(l *loadIndex) error {
// 1. use msg's index paths to get index bytes
var err error
var indexBuffer [][]byte
var indexParams indexParam
var indexName string
var indexID UniqueID
fn := func() error {
indexBuffer, indexParams, indexName, indexID, err = loader.loadIndex(l.indexPaths)
if err != nil {
return err
}
return nil
}
err = util.Retry(5, time.Millisecond*200, fn)
if err != nil {
return err
}
ok, err := loader.checkIndexReady(indexParams, l)
if err != nil {
return err
}
if ok {
// no error
return errors.New("")
}
// 2. use index bytes and index path to update segment
err = loader.updateSegmentIndex(indexParams, indexBuffer, l)
if err != nil {
return err
}
// 3. update segment index stats
err = loader.updateSegmentIndexStats(indexParams, indexName, indexID, l)
if err != nil {
return err
}
fmt.Println("load index done")
return nil
}
func (loader *indexLoader) printIndexParams(index []*commonpb.KeyValuePair) {
fmt.Println("=================================================")
for i := 0; i < len(index); i++ {
fmt.Println(index[i])
}
}
func (loader *indexLoader) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool {
if len(index1) != len(index2) {
return false
}
for i := 0; i < len(index1); i++ {
kv1 := *index1[i]
kv2 := *index2[i]
if kv1.Key != kv2.Key || kv1.Value != kv2.Value {
return false
}
}
return true
}
func (loader *indexLoader) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string {
return strconv.FormatInt(collectionID, 10) + "/" + strconv.FormatInt(fieldID, 10)
}
func (loader *indexLoader) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) {
ids := strings.Split(key, "/")
if len(ids) != 2 {
return 0, 0, errors.New("illegal fieldsStatsKey")
}
collectionID, err := strconv.ParseInt(ids[0], 10, 64)
if err != nil {
return 0, 0, err
}
fieldID, err := strconv.ParseInt(ids[1], 10, 64)
if err != nil {
return 0, 0, err
}
return collectionID, fieldID, nil
}
func (loader *indexLoader) updateSegmentIndexStats(indexParams indexParam, indexName string, indexID UniqueID, l *loadIndex) error {
targetSegment, err := loader.replica.getSegmentByID(l.segmentID)
if err != nil {
return err
}
fieldStatsKey := loader.fieldsStatsIDs2Key(targetSegment.collectionID, l.fieldID)
_, ok := loader.fieldIndexes[fieldStatsKey]
newIndexParams := make([]*commonpb.KeyValuePair, 0)
for k, v := range indexParams {
newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{
Key: k,
Value: v,
})
}
// sort index params by key
sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key })
if !ok {
loader.fieldIndexes[fieldStatsKey] = make([]*internalpb2.IndexStats, 0)
loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey],
&internalpb2.IndexStats{
IndexParams: newIndexParams,
NumRelatedSegments: 1,
})
} else {
isNewIndex := true
for _, index := range loader.fieldIndexes[fieldStatsKey] {
if loader.indexParamsEqual(newIndexParams, index.IndexParams) {
index.NumRelatedSegments++
isNewIndex = false
}
}
if isNewIndex {
loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey],
&internalpb2.IndexStats{
IndexParams: newIndexParams,
NumRelatedSegments: 1,
})
}
}
err = targetSegment.setIndexParam(l.fieldID, newIndexParams)
if err != nil {
return err
}
targetSegment.setIndexName(indexName)
targetSegment.setIndexID(indexID)
return nil
}
func (loader *indexLoader) loadIndex(indexPath []string) ([][]byte, indexParam, string, UniqueID, error) {
index := make([][]byte, 0)
var indexParams indexParam
var indexName string
var indexID UniqueID
for _, p := range indexPath {
fmt.Println("load path = ", indexPath)
indexPiece, err := loader.kv.Load(p)
if err != nil {
return nil, nil, "", -1, err
}
// get index params when detecting indexParamPrefix
if path.Base(p) == storage.IndexParamsFile {
indexCodec := storage.NewIndexCodec()
_, indexParams, indexName, indexID, err = indexCodec.Deserialize([]*storage.Blob{
{
Key: storage.IndexParamsFile,
Value: []byte(indexPiece),
},
})
if err != nil {
return nil, nil, "", -1, err
}
} else {
index = append(index, []byte(indexPiece))
}
}
if len(indexParams) <= 0 {
return nil, nil, "", -1, errors.New("cannot find index param")
}
return index, indexParams, indexName, indexID, nil
}
func (loader *indexLoader) updateSegmentIndex(indexParams indexParam, bytesIndex [][]byte, l *loadIndex) error {
segment, err := loader.replica.getSegmentByID(l.segmentID)
if err != nil {
return err
}
loadIndexInfo, err := newLoadIndexInfo()
defer deleteLoadIndexInfo(loadIndexInfo)
if err != nil {
return err
}
err = loadIndexInfo.appendFieldInfo(l.fieldID)
if err != nil {
return err
}
for k, v := range indexParams {
err = loadIndexInfo.appendIndexParam(k, v)
if err != nil {
return err
}
}
err = loadIndexInfo.appendIndex(bytesIndex, l.indexPaths)
if err != nil {
return err
}
return segment.updateSegmentIndex(loadIndexInfo)
}
func (loader *indexLoader) sendQueryNodeStats() error {
resultFieldsStats := make([]*internalpb2.FieldStats, 0)
for fieldStatsKey, indexStats := range loader.fieldIndexes {
colID, fieldID, err := loader.fieldsStatsKey2IDs(fieldStatsKey)
if err != nil {
return err
}
fieldStats := internalpb2.FieldStats{
CollectionID: colID,
FieldID: fieldID,
IndexStats: indexStats,
}
resultFieldsStats = append(resultFieldsStats, &fieldStats)
}
loader.fieldStatsChan <- resultFieldsStats
fmt.Println("sent field stats")
return nil
}
func (loader *indexLoader) checkIndexReady(indexParams indexParam, l *loadIndex) (bool, error) {
segment, err := loader.replica.getSegmentByID(l.segmentID)
if err != nil {
return false, err
}
if !segment.matchIndexParam(l.fieldID, indexParams) {
return false, nil
}
return true, nil
}
func (loader *indexLoader) getIndexInfo(collectionID UniqueID, segmentID UniqueID) (UniqueID, UniqueID, error) {
req := &milvuspb.DescribeSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeSegment,
},
CollectionID: collectionID,
SegmentID: segmentID,
}
response, err := loader.masterClient.DescribeSegment(req)
if err != nil {
return 0, 0, err
}
return response.IndexID, response.BuildID, nil
}
func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error) {
if loader.indexClient == nil {
return nil, errors.New("null index service client")
}
indexFilePathRequest := &indexpb.IndexFilePathsRequest{
// TODO: rename indexIDs to buildIDs
IndexBuildIDs: []UniqueID{indexBuildID},
}
pathResponse, err := loader.indexClient.GetIndexFilePaths(indexFilePathRequest)
if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, err
}
if len(pathResponse.FilePaths) <= 0 {
return nil, errors.New("illegal index file paths")
}
return pathResponse.FilePaths[0].IndexFilePaths, nil
}
func (loader *indexLoader) loadIndexImmediate(segment *Segment, indexPaths []string) error {
// get vector field ids from schema to load index
vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(segment.collectionID)
if err != nil {
return err
}
for _, id := range vecFieldIDs {
l := &loadIndex{
segmentID: segment.ID(),
fieldID: id,
indexPaths: indexPaths,
}
err = loader.execute(l)
if err != nil {
return err
}
}
return nil
}
func (loader *indexLoader) loadIndexDelayed(collectionID, segmentID UniqueID, indexPaths []string) error {
// get vector field ids from schema to load index
vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(collectionID)
if err != nil {
return err
}
for _, id := range vecFieldIDs {
l := &loadIndex{
segmentID: segmentID,
fieldID: id,
indexPaths: indexPaths,
}
err = loader.execute(l)
if err != nil {
return err
}
}
return nil
}
func newIndexLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, replica collectionReplica) *indexLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSLStr,
CreateBucket: true,
BucketName: Params.MinioBucketName,
}
client, err := minioKV.NewMinIOKV(ctx, option)
if err != nil {
panic(err)
}
return &indexLoader{
replica: replica,
fieldIndexes: make(map[string][]*internalpb2.IndexStats),
fieldStatsChan: make(chan []*internalpb2.FieldStats, 1),
masterClient: masterClient,
indexClient: indexClient,
kv: client,
}
}
......@@ -2,25 +2,12 @@ package querynode
import (
"context"
"errors"
"fmt"
"log"
"path"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/kv"
minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/storage"
)
const indexCheckInterval = 1
......@@ -29,622 +16,126 @@ type loadService struct {
ctx context.Context
cancel context.CancelFunc
replica collectionReplica
fieldIndexes map[string][]*internalpb2.IndexStats
fieldStatsChan chan []*internalpb2.FieldStats
dmStream msgstream.MsgStream
masterClient MasterServiceInterface
dataClient DataServiceInterface
indexClient IndexServiceInterface
kv kv.Base // minio kv
iCodec *storage.InsertCodec
}
type loadIndex struct {
segmentID UniqueID
fieldID int64
indexPaths []string
segLoader *segmentLoader
}
// -------------------------------------------- load index -------------------------------------------- //
func (s *loadService) start() {
wg := &sync.WaitGroup{}
for {
select {
case <-s.ctx.Done():
return
case <-time.After(indexCheckInterval * time.Second):
collectionIDs, segmentIDs := s.replica.getSealedSegments()
if len(collectionIDs) <= 0 {
continue
}
fmt.Println("do load index for segments:", segmentIDs)
for i := range collectionIDs {
// we don't need index id yet
_, buildID, err := s.getIndexInfo(collectionIDs[i], segmentIDs[i])
if err != nil {
indexPaths, err := s.getIndexPaths(buildID)
if err != nil {
log.Println(err)
continue
}
err = s.loadIndexDelayed(collectionIDs[i], segmentIDs[i], indexPaths)
if err != nil {
log.Println(err)
continue
}
}
}
// sendQueryNodeStats
err := s.sendQueryNodeStats()
if err != nil {
log.Println(err)
continue
}
}
}
}
func (s *loadService) execute(l *loadIndex) error {
// 1. use msg's index paths to get index bytes
var err error
var indexBuffer [][]byte
var indexParams indexParam
var indexName string
var indexID UniqueID
fn := func() error {
indexBuffer, indexParams, indexName, indexID, err = s.loadIndex(l.indexPaths)
if err != nil {
return err
wg.Add(2)
go s.segLoader.indexLoader.doLoadIndex(wg)
go s.loadSegmentActively(wg)
wg.Wait()
}
return nil
}
err = util.Retry(5, time.Millisecond*200, fn)
if err != nil {
return err
}
ok, err := s.checkIndexReady(indexParams, l)
if err != nil {
return err
}
if ok {
// no error
return errors.New("")
}
// 2. use index bytes and index path to update segment
err = s.updateSegmentIndex(indexParams, indexBuffer, l)
if err != nil {
return err
}
// 3. update segment index stats
err = s.updateSegmentIndexStats(indexParams, indexName, indexID, l)
if err != nil {
return err
}
fmt.Println("load index done")
return nil
}
func (s *loadService) close() {
s.cancel()
}
func (s *loadService) printIndexParams(index []*commonpb.KeyValuePair) {
fmt.Println("=================================================")
for i := 0; i < len(index); i++ {
fmt.Println(index[i])
}
}
func (s *loadService) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool {
if len(index1) != len(index2) {
return false
}
for i := 0; i < len(index1); i++ {
kv1 := *index1[i]
kv2 := *index2[i]
if kv1.Key != kv2.Key || kv1.Value != kv2.Value {
return false
}
}
return true
}
func (s *loadService) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string {
return strconv.FormatInt(collectionID, 10) + "/" + strconv.FormatInt(fieldID, 10)
}
func (s *loadService) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) {
ids := strings.Split(key, "/")
if len(ids) != 2 {
return 0, 0, errors.New("illegal fieldsStatsKey")
}
collectionID, err := strconv.ParseInt(ids[0], 10, 64)
if err != nil {
return 0, 0, err
}
fieldID, err := strconv.ParseInt(ids[1], 10, 64)
if err != nil {
return 0, 0, err
func (s *loadService) loadSegmentActively(wg *sync.WaitGroup) {
collectionIDs, partitionIDs, segmentIDs := s.segLoader.replica.getEnabledSealedSegmentsBySegmentType(segTypeGrowing)
if len(collectionIDs) <= 0 {
return
}
return collectionID, fieldID, nil
}
func (s *loadService) updateSegmentIndexStats(indexParams indexParam, indexName string, indexID UniqueID, l *loadIndex) error {
targetSegment, err := s.replica.getSegmentByID(l.segmentID)
if err != nil {
return err
}
fieldStatsKey := s.fieldsStatsIDs2Key(targetSegment.collectionID, l.fieldID)
_, ok := s.fieldIndexes[fieldStatsKey]
newIndexParams := make([]*commonpb.KeyValuePair, 0)
for k, v := range indexParams {
newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{
Key: k,
Value: v,
})
}
// sort index params by key
sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key })
if !ok {
s.fieldIndexes[fieldStatsKey] = make([]*internalpb2.IndexStats, 0)
s.fieldIndexes[fieldStatsKey] = append(s.fieldIndexes[fieldStatsKey],
&internalpb2.IndexStats{
IndexParams: newIndexParams,
NumRelatedSegments: 1,
})
} else {
isNewIndex := true
for _, index := range s.fieldIndexes[fieldStatsKey] {
if s.indexParamsEqual(newIndexParams, index.IndexParams) {
index.NumRelatedSegments++
isNewIndex = false
}
}
if isNewIndex {
s.fieldIndexes[fieldStatsKey] = append(s.fieldIndexes[fieldStatsKey],
&internalpb2.IndexStats{
IndexParams: newIndexParams,
NumRelatedSegments: 1,
})
}
}
err = targetSegment.setIndexParam(l.fieldID, newIndexParams)
if err != nil {
return err
}
targetSegment.setIndexName(indexName)
targetSegment.setIndexID(indexID)
return nil
}
func (s *loadService) loadIndex(indexPath []string) ([][]byte, indexParam, string, UniqueID, error) {
index := make([][]byte, 0)
var indexParams indexParam
var indexName string
var indexID UniqueID
for _, p := range indexPath {
fmt.Println("load path = ", indexPath)
indexPiece, err := s.kv.Load(p)
fmt.Println("do load segment for growing segments:", segmentIDs)
for i := range collectionIDs {
fieldIDs, err := s.segLoader.replica.getFieldIDsByCollectionID(collectionIDs[i])
if err != nil {
return nil, nil, "", -1, err
}
// get index params when detecting indexParamPrefix
if path.Base(p) == storage.IndexParamsFile {
indexCodec := storage.NewIndexCodec()
_, indexParams, indexName, indexID, err = indexCodec.Deserialize([]*storage.Blob{
{
Key: storage.IndexParamsFile,
Value: []byte(indexPiece),
},
})
if err != nil {
return nil, nil, "", -1, err
}
} else {
index = append(index, []byte(indexPiece))
log.Println(err)
continue
}
}
if len(indexParams) <= 0 {
return nil, nil, "", -1, errors.New("cannot find index param")
}
return index, indexParams, indexName, indexID, nil
}
func (s *loadService) updateSegmentIndex(indexParams indexParam, bytesIndex [][]byte, l *loadIndex) error {
segment, err := s.replica.getSegmentByID(l.segmentID)
if err != nil {
return err
}
loadIndexInfo, err := newLoadIndexInfo()
defer deleteLoadIndexInfo(loadIndexInfo)
if err != nil {
return err
}
err = loadIndexInfo.appendFieldInfo(l.fieldID)
if err != nil {
return err
}
for k, v := range indexParams {
err = loadIndexInfo.appendIndexParam(k, v)
err = s.loadSegmentInternal(collectionIDs[i], partitionIDs[i], segmentIDs[i], fieldIDs)
if err != nil {
return err
log.Println(err)
}
}
err = loadIndexInfo.appendIndex(bytesIndex, l.indexPaths)
// sendQueryNodeStats
err := s.segLoader.indexLoader.sendQueryNodeStats()
if err != nil {
return err
log.Println(err)
wg.Done()
return
}
return segment.updateSegmentIndex(loadIndexInfo)
}
func (s *loadService) sendQueryNodeStats() error {
resultFieldsStats := make([]*internalpb2.FieldStats, 0)
for fieldStatsKey, indexStats := range s.fieldIndexes {
colID, fieldID, err := s.fieldsStatsKey2IDs(fieldStatsKey)
if err != nil {
return err
}
fieldStats := internalpb2.FieldStats{
CollectionID: colID,
FieldID: fieldID,
IndexStats: indexStats,
}
resultFieldsStats = append(resultFieldsStats, &fieldStats)
}
s.fieldStatsChan <- resultFieldsStats
fmt.Println("sent field stats")
return nil
wg.Done()
}
func (s *loadService) checkIndexReady(indexParams indexParam, l *loadIndex) (bool, error) {
segment, err := s.replica.getSegmentByID(l.segmentID)
if err != nil {
return false, err
}
if !segment.matchIndexParam(l.fieldID, indexParams) {
return false, nil
}
return true, nil
}
func (s *loadService) getIndexInfo(collectionID UniqueID, segmentID UniqueID) (UniqueID, UniqueID, error) {
req := &milvuspb.DescribeSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeSegment,
},
CollectionID: collectionID,
SegmentID: segmentID,
}
response, err := s.masterClient.DescribeSegment(req)
if err != nil {
return 0, 0, err
}
return response.IndexID, response.BuildID, nil
}
// -------------------------------------------- load segment -------------------------------------------- //
// load segment passively
func (s *loadService) loadSegment(collectionID UniqueID, partitionID UniqueID, segmentIDs []UniqueID, fieldIDs []int64) error {
// TODO: interim solution
if len(fieldIDs) == 0 {
collection, err := s.replica.getCollectionByID(collectionID)
var err error
fieldIDs, err = s.segLoader.replica.getFieldIDsByCollectionID(collectionID)
if err != nil {
return err
}
fieldIDs = make([]int64, 0)
for _, field := range collection.Schema().Fields {
fieldIDs = append(fieldIDs, field.FieldID)
}
}
for _, segmentID := range segmentIDs {
// we don't need index id yet
_, buildID, errIndex := s.getIndexInfo(collectionID, segmentID)
if errIndex == nil {
// we don't need load to vector fields
vectorFields, err := s.replica.getVecFieldsByCollectionID(segmentID)
if err != nil {
return err
}
fieldIDs = s.filterOutVectorFields(fieldIDs, vectorFields)
}
paths, srcFieldIDs, err := s.getInsertBinlogPaths(segmentID)
if err != nil {
return err
}
targetFields := s.getTargetFields(paths, srcFieldIDs, fieldIDs)
collection, err := s.replica.getCollectionByID(collectionID)
if err != nil {
return err
}
segment := newSegment(collection, segmentID, partitionID, collectionID, segTypeSealed)
err = s.loadSegmentFieldsData(segment, targetFields)
err := s.loadSegmentInternal(collectionID, partitionID, segmentID, fieldIDs)
if err != nil {
return err
}
if errIndex == nil {
indexPaths, err := s.getIndexPaths(buildID)
if err != nil {
return err
}
err = s.loadIndexImmediate(segment, indexPaths)
if err != nil {
// TODO: return or continue?
return err
}
log.Println(err)
continue
}
}
return nil
}
func (s *loadService) releaseSegment(segmentID UniqueID) error {
err := s.replica.removeSegment(segmentID)
return err
}
func (s *loadService) seekSegment(position *internalpb2.MsgPosition) error {
// TODO: open seek
//for _, position := range positions {
// err := s.dmStream.Seek(position)
// if err != nil {
// return err
// }
//}
return nil
}
func (s *loadService) getIndexPaths(indexBuildID UniqueID) ([]string, error) {
if s.indexClient == nil {
return nil, errors.New("null index service client")
}
indexFilePathRequest := &indexpb.IndexFilePathsRequest{
// TODO: rename indexIDs to buildIDs
IndexBuildIDs: []UniqueID{indexBuildID},
}
pathResponse, err := s.indexClient.GetIndexFilePaths(indexFilePathRequest)
if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, err
}
if len(pathResponse.FilePaths) <= 0 {
return nil, errors.New("illegal index file paths")
}
return pathResponse.FilePaths[0].IndexFilePaths, nil
}
func (s *loadService) loadIndexImmediate(segment *Segment, indexPaths []string) error {
// get vector field ids from schema to load index
vecFieldIDs, err := s.replica.getVecFieldsByCollectionID(segment.collectionID)
if err != nil {
return err
}
for _, id := range vecFieldIDs {
l := &loadIndex{
segmentID: segment.ID(),
fieldID: id,
indexPaths: indexPaths,
}
err = s.execute(l)
if err != nil {
return err
}
// replace segment
err = s.replica.replaceGrowingSegmentBySealedSegment(segment)
func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, fieldIDs []int64) error {
// we don't need index id yet
_, buildID, errIndex := s.segLoader.indexLoader.getIndexInfo(collectionID, segmentID)
if errIndex == nil {
// we don't need load to vector fields
vectorFields, err := s.segLoader.replica.getVecFieldIDsByCollectionID(segmentID)
if err != nil {
return err
}
fieldIDs = s.segLoader.filterOutVectorFields(fieldIDs, vectorFields)
}
return nil
}
func (s *loadService) loadIndexDelayed(collectionID, segmentID UniqueID, indexPaths []string) error {
// get vector field ids from schema to load index
vecFieldIDs, err := s.replica.getVecFieldsByCollectionID(collectionID)
paths, srcFieldIDs, err := s.segLoader.getInsertBinlogPaths(segmentID)
if err != nil {
return err
}
for _, id := range vecFieldIDs {
l := &loadIndex{
segmentID: segmentID,
fieldID: id,
indexPaths: indexPaths,
}
err = s.execute(l)
if err != nil {
return err
}
}
return nil
}
func (s *loadService) getInsertBinlogPaths(segmentID UniqueID) ([]*internalpb2.StringList, []int64, error) {
if s.dataClient == nil {
return nil, nil, errors.New("null data service client")
}
insertBinlogPathRequest := &datapb.InsertBinlogPathRequest{
SegmentID: segmentID,
}
pathResponse, err := s.dataClient.GetInsertBinlogPaths(insertBinlogPathRequest)
targetFields := s.segLoader.getTargetFields(paths, srcFieldIDs, fieldIDs)
collection, err := s.segLoader.replica.getCollectionByID(collectionID)
if err != nil {
return nil, nil, err
}
if len(pathResponse.FieldIDs) != len(pathResponse.Paths) {
return nil, nil, errors.New("illegal InsertBinlogPathsResponse")
}
return pathResponse.Paths, pathResponse.FieldIDs, nil
}
func (s *loadService) filterOutVectorFields(fieldIDs []int64, vectorFields []int64) []int64 {
containsFunc := func(s []int64, e int64) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}
targetFields := make([]int64, 0)
for _, id := range fieldIDs {
if !containsFunc(vectorFields, id) {
targetFields = append(targetFields, id)
}
}
return targetFields
}
func (s *loadService) getTargetFields(paths []*internalpb2.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalpb2.StringList {
targetFields := make(map[int64]*internalpb2.StringList)
containsFunc := func(s []int64, e int64) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
return err
}
for i, fieldID := range srcFieldIDS {
if containsFunc(dstFields, fieldID) {
targetFields[fieldID] = paths[i]
}
segment := newSegment(collection, segmentID, partitionID, collectionID, segTypeSealed)
err = s.segLoader.loadSegmentFieldsData(segment, targetFields)
if err != nil {
return err
}
return targetFields
}
func (s *loadService) loadSegmentFieldsData(segment *Segment, targetFields map[int64]*internalpb2.StringList) error {
for id, p := range targetFields {
if id == timestampFieldID {
// seg core doesn't need timestamp field
continue
}
paths := p.Values
blobs := make([]*storage.Blob, 0)
for _, path := range paths {
binLog, err := s.kv.Load(path)
if err != nil {
// TODO: return or continue?
return err
}
blobs = append(blobs, &storage.Blob{
Key: strconv.FormatInt(id, 10), // TODO: key???
Value: []byte(binLog),
})
}
_, _, insertData, err := s.iCodec.Deserialize(blobs)
if errIndex == nil {
indexPaths, err := s.segLoader.indexLoader.getIndexPaths(buildID)
if err != nil {
// TODO: return or continue
return err
}
if len(insertData.Data) != 1 {
return errors.New("we expect only one field in deserialized insert data")
}
for _, value := range insertData.Data {
var numRows int
var data interface{}
switch fieldData := value.(type) {
case *storage.BoolFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.Int8FieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.Int16FieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.Int32FieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.Int64FieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.FloatFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.DoubleFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case storage.StringFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.FloatVectorFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.BinaryVectorFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
default:
return errors.New("unexpected field data type")
}
err = segment.segmentLoadFieldData(id, numRows, data)
if err != nil {
// TODO: return or continue?
return err
}
err = s.segLoader.indexLoader.loadIndexImmediate(segment, indexPaths)
if err != nil {
return err
}
}
return nil
// replace segment
return s.segLoader.replica.replaceGrowingSegmentBySealedSegment(segment)
}
func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *loadService {
ctx1, cancel := context.WithCancel(ctx)
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSLStr,
CreateBucket: true,
BucketName: Params.MinioBucketName,
}
client, err := minioKV.NewMinIOKV(ctx1, option)
if err != nil {
panic(err)
}
segLoader := newSegmentLoader(ctx1, masterClient, indexClient, dataClient, replica, dmStream)
return &loadService{
ctx: ctx1,
cancel: cancel,
replica: replica,
fieldIndexes: make(map[string][]*internalpb2.IndexStats),
fieldStatsChan: make(chan []*internalpb2.FieldStats, 1),
dmStream: dmStream,
masterClient: masterClient,
dataClient: dataClient,
indexClient: indexClient,
kv: client,
iCodec: &storage.InsertCodec{},
segLoader: segLoader,
}
}
......@@ -1126,19 +1126,19 @@ func TestSegmentLoad_Search_Vector(t *testing.T) {
paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix)
assert.NoError(t, err)
fieldsMap := node.loadService.getTargetFields(paths, srcFieldIDs, fieldIDs)
fieldsMap := node.loadService.segLoader.getTargetFields(paths, srcFieldIDs, fieldIDs)
assert.Equal(t, len(fieldsMap), 2)
segment, err := node.replica.getSegmentByID(segmentID)
assert.NoError(t, err)
err = node.loadService.loadSegmentFieldsData(segment, fieldsMap)
err = node.loadService.segLoader.loadSegmentFieldsData(segment, fieldsMap)
assert.NoError(t, err)
indexPaths, err := generateIndex(segmentID)
assert.NoError(t, err)
err = node.loadService.loadIndexImmediate(segment, indexPaths)
err = node.loadService.segLoader.indexLoader.loadIndexImmediate(segment, indexPaths)
assert.NoError(t, err)
// do search
......
......@@ -16,7 +16,7 @@ type Partition struct {
collectionID UniqueID
partitionID UniqueID
segmentIDs []UniqueID
enableDM bool
enable bool
}
func (p *Partition) ID() UniqueID {
......@@ -41,7 +41,7 @@ func newPartition(collectionID UniqueID, partitionID UniqueID) *Partition {
var newPartition = &Partition{
collectionID: collectionID,
partitionID: partitionID,
enableDM: false,
enable: false,
}
return newPartition
......
......@@ -148,7 +148,7 @@ func (node *QueryNode) Start() error {
node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream)
node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.fieldStatsChan)
node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan)
// start services
go node.dataSyncService.start()
......@@ -382,7 +382,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
segmentIDs := in.SegmentIDs
fieldIDs := in.FieldIDs
err := node.replica.enablePartitionDM(partitionID)
err := node.replica.enablePartition(partitionID)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
......@@ -395,7 +395,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
for i, state := range in.SegmentStates {
if state.State == commonpb.SegmentState_SegmentGrowing {
position := state.StartPosition
err = node.loadService.seekSegment(position)
err = node.loadService.segLoader.seekSegment(position)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
......@@ -423,7 +423,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) {
for _, id := range in.PartitionIDs {
err := node.replica.enablePartitionDM(id)
err := node.replica.enablePartition(id)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
......@@ -435,7 +435,7 @@ func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*comm
// release all fields in the segments
for _, id := range in.SegmentIDs {
err := node.loadService.releaseSegment(id)
err := node.loadService.segLoader.releaseSegment(id)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
......
package querynode
import (
"context"
"errors"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/kv"
minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/storage"
)
// segmentLoader is only responsible for loading the field data from binlog
type segmentLoader struct {
replica collectionReplica
dmStream msgstream.MsgStream
dataClient DataServiceInterface
kv kv.Base // minio kv
iCodec *storage.InsertCodec
indexLoader *indexLoader
}
func (loader *segmentLoader) releaseSegment(segmentID UniqueID) error {
err := loader.replica.removeSegment(segmentID)
return err
}
func (loader *segmentLoader) seekSegment(position *internalpb2.MsgPosition) error {
// TODO: open seek
//for _, position := range positions {
// err := s.dmStream.Seek(position)
// if err != nil {
// return err
// }
//}
return nil
}
func (loader *segmentLoader) getInsertBinlogPaths(segmentID UniqueID) ([]*internalpb2.StringList, []int64, error) {
if loader.dataClient == nil {
return nil, nil, errors.New("null data service client")
}
insertBinlogPathRequest := &datapb.InsertBinlogPathRequest{
SegmentID: segmentID,
}
pathResponse, err := loader.dataClient.GetInsertBinlogPaths(insertBinlogPathRequest)
if err != nil {
return nil, nil, err
}
if len(pathResponse.FieldIDs) != len(pathResponse.Paths) {
return nil, nil, errors.New("illegal InsertBinlogPathsResponse")
}
return pathResponse.Paths, pathResponse.FieldIDs, nil
}
func (loader *segmentLoader) filterOutVectorFields(fieldIDs []int64, vectorFields []int64) []int64 {
containsFunc := func(s []int64, e int64) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}
targetFields := make([]int64, 0)
for _, id := range fieldIDs {
if !containsFunc(vectorFields, id) {
targetFields = append(targetFields, id)
}
}
return targetFields
}
func (loader *segmentLoader) getTargetFields(paths []*internalpb2.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalpb2.StringList {
targetFields := make(map[int64]*internalpb2.StringList)
containsFunc := func(s []int64, e int64) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}
for i, fieldID := range srcFieldIDS {
if containsFunc(dstFields, fieldID) {
targetFields[fieldID] = paths[i]
}
}
return targetFields
}
func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, targetFields map[int64]*internalpb2.StringList) error {
for id, p := range targetFields {
if id == timestampFieldID {
// seg core doesn't need timestamp field
continue
}
paths := p.Values
blobs := make([]*storage.Blob, 0)
for _, path := range paths {
binLog, err := loader.kv.Load(path)
if err != nil {
// TODO: return or continue?
return err
}
blobs = append(blobs, &storage.Blob{
Key: strconv.FormatInt(id, 10), // TODO: key???
Value: []byte(binLog),
})
}
_, _, insertData, err := loader.iCodec.Deserialize(blobs)
if err != nil {
// TODO: return or continue
return err
}
if len(insertData.Data) != 1 {
return errors.New("we expect only one field in deserialized insert data")
}
for _, value := range insertData.Data {
var numRows int
var data interface{}
switch fieldData := value.(type) {
case *storage.BoolFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.Int8FieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.Int16FieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.Int32FieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.Int64FieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.FloatFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.DoubleFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case storage.StringFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.FloatVectorFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
case *storage.BinaryVectorFieldData:
numRows = fieldData.NumRows
data = fieldData.Data
default:
return errors.New("unexpected field data type")
}
err = segment.segmentLoadFieldData(id, numRows, data)
if err != nil {
// TODO: return or continue?
return err
}
}
}
return nil
}
func newSegmentLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, dataClient DataServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *segmentLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSLStr,
CreateBucket: true,
BucketName: Params.MinioBucketName,
}
client, err := minioKV.NewMinIOKV(ctx, option)
if err != nil {
panic(err)
}
iLoader := newIndexLoader(ctx, masterClient, indexClient, replica)
return &segmentLoader{
replica: replica,
dmStream: dmStream,
dataClient: dataClient,
kv: client,
iCodec: &storage.InsertCodec{},
indexLoader: iLoader,
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册