未验证 提交 39f8c9dd 编写于 作者: C Cai Yudong 提交者: GitHub

Save msg start position and end position when AddSegment and AddIndex (#5358)


Resolves: #5332 
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 3f42a9ed
...@@ -114,11 +114,11 @@ type Core struct { ...@@ -114,11 +114,11 @@ type Core struct {
//setMsgStreams, send drop partition into dd channel //setMsgStreams, send drop partition into dd channel
SendDdDropPartitionReq func(ctx context.Context, req *internalpb.DropPartitionRequest) error SendDdDropPartitionReq func(ctx context.Context, req *internalpb.DropPartitionRequest) error
//setMsgStreams segment channel, receive segment info from data service, if master create segment // if master create segment, data service will put segment msg into this channel
DataServiceSegmentChan chan *datapb.SegmentInfo DataServiceSegmentChan <-chan *ms.MsgPack
//setMsgStreams ,if segment flush completed, data node would put segment id into msg stream // if segment flush completed, data node would put segment msg into this channel
DataNodeSegmentFlushCompletedChan chan typeutil.UniqueID DataNodeFlushedSegmentChan <-chan *ms.MsgPack
//get binlog file path from data service, //get binlog file path from data service,
GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
...@@ -214,9 +214,6 @@ func (c *Core) checkInit() error { ...@@ -214,9 +214,6 @@ func (c *Core) checkInit() error {
if c.SendDdDropPartitionReq == nil { if c.SendDdDropPartitionReq == nil {
return fmt.Errorf("SendDdDropPartitionReq is nil") return fmt.Errorf("SendDdDropPartitionReq is nil")
} }
if c.DataServiceSegmentChan == nil {
return fmt.Errorf("DataServiceSegmentChan is nil")
}
if c.GetBinlogFilePathsFromDataServiceReq == nil { if c.GetBinlogFilePathsFromDataServiceReq == nil {
return fmt.Errorf("GetBinlogFilePathsFromDataServiceReq is nil") return fmt.Errorf("GetBinlogFilePathsFromDataServiceReq is nil")
} }
...@@ -232,8 +229,11 @@ func (c *Core) checkInit() error { ...@@ -232,8 +229,11 @@ func (c *Core) checkInit() error {
if c.InvalidateCollectionMetaCache == nil { if c.InvalidateCollectionMetaCache == nil {
return fmt.Errorf("InvalidateCollectionMetaCache is nil") return fmt.Errorf("InvalidateCollectionMetaCache is nil")
} }
if c.DataNodeSegmentFlushCompletedChan == nil { if c.DataServiceSegmentChan == nil {
return fmt.Errorf("DataNodeSegmentFlushCompletedChan is nil") return fmt.Errorf("DataServiceSegmentChan is nil")
}
if c.DataNodeFlushedSegmentChan == nil {
return fmt.Errorf("DataNodeFlushedSegmentChan is nil")
} }
if c.ReleaseCollection == nil { if c.ReleaseCollection == nil {
return fmt.Errorf("ReleaseCollection is nil") return fmt.Errorf("ReleaseCollection is nil")
...@@ -307,71 +307,127 @@ func (c *Core) startTimeTickLoop() { ...@@ -307,71 +307,127 @@ func (c *Core) startTimeTickLoop() {
} }
} }
//data service send segment info to master when create segment // data service send segment info msg to master when create segment
func (c *Core) startDataServiceSegmentLoop() { func (c *Core) startDataServiceSegmentLoop() {
for { for {
select { select {
case <-c.ctx.Done(): case <-c.ctx.Done():
log.Debug("close data service segment loop") log.Debug("close data service segment loop")
return return
case seg, ok := <-c.DataServiceSegmentChan: case segMsg, ok := <-c.DataServiceSegmentChan:
if !ok { if !ok {
log.Debug("data service segment is closed, exit loop") log.Debug("data service segment channel is closed, exit loop")
return return
} }
if seg == nil { var segInfos []*datapb.SegmentInfo
log.Warn("segment from data service is nil") for _, msg := range segMsg.Msgs {
} else if _, err := c.MetaTable.AddSegment(seg); err != nil { if msg.Type() != commonpb.MsgType_SegmentInfo {
//what if master add segment failed, but data service success? continue
log.Warn("add segment info meta table failed ", zap.String("error", err.Error())) }
} else { segInfoMsg := msg.(*ms.SegmentInfoMsg)
log.Debug("add segment", zap.Int64("collection id", seg.CollectionID), zap.Int64("partition id", seg.PartitionID), zap.Int64("segment id", seg.ID)) segInfos = append(segInfos, segInfoMsg.Segment)
}
if len(segInfos) > 0 {
startPosByte, err := json.Marshal(segMsg.StartPositions)
if err != nil {
log.Error("json.Marshal fail", zap.String("err", err.Error()))
continue
}
endPosByte, err := json.Marshal(segMsg.EndPositions)
if err != nil {
log.Error("json.Marshal fail", zap.String("err", err.Error()))
continue
}
if _, err := c.MetaTable.AddSegment(segInfos, string(startPosByte), string(endPosByte)); err != nil {
//what if master add segment failed, but data service success?
log.Debug("add segment info meta table failed ", zap.String("error", err.Error()))
continue
}
} }
} }
} }
} }
func (c *Core) startSegmentFlushCompletedLoop() { // data node will put msg in this channel when flush segment
func (c *Core) startDataNodeFlushedSegmentLoop() {
for { for {
select { select {
case <-c.ctx.Done(): case <-c.ctx.Done():
log.Debug("close segment flush completed loop") log.Debug("close segment flush completed loop")
return return
case segID, ok := <-c.DataNodeSegmentFlushCompletedChan: case segMsg, ok := <-c.DataNodeFlushedSegmentChan:
if !ok { if !ok {
log.Debug("data node segment flush completed chan has closed, exit loop") log.Debug("data node segment flush completed chan has closed, exit loop")
return
} }
log.Debug("flush segment", zap.Int64("id", segID))
coll, err := c.MetaTable.GetCollectionBySegmentID(segID) startPosByte, err := json.Marshal(segMsg.StartPositions)
if err != nil { if err != nil {
log.Warn("GetCollectionBySegmentID error", zap.Error(err)) log.Error("json.Marshal fail", zap.String("err", err.Error()))
break continue
} }
err = c.MetaTable.AddFlushedSegment(segID) endPosByte, err := json.Marshal(segMsg.EndPositions)
if err != nil { if err != nil {
log.Warn("AddFlushedSegment error", zap.Error(err)) log.Error("json.Marshal fail", zap.String("err", err.Error()))
continue
} }
for _, f := range coll.FieldIndexes {
idxInfo, err := c.MetaTable.GetIndexByID(f.IndexID) var segIdxInfos []*etcdpb.SegmentIndexInfo
if err != nil { for _, msg := range segMsg.Msgs {
log.Warn("index not found", zap.Int64("index id", f.IndexID)) // check msg type
if msg.Type() != commonpb.MsgType_SegmentFlushDone {
continue continue
} }
flushMsg := msg.(*ms.FlushCompletedMsg)
segID := flushMsg.SegmentID
log.Debug("flush segment", zap.Int64("id", segID))
fieldSch, err := GetFieldSchemaByID(coll, f.FiledID) coll, err := c.MetaTable.GetCollectionBySegmentID(segID)
if err != nil {
log.Warn("GetCollectionBySegmentID error", zap.Error(err))
continue
}
err = c.MetaTable.AddFlushedSegment(segID)
if err != nil { if err != nil {
log.Warn("field schema not found", zap.Int64("field id", f.FiledID)) log.Warn("AddFlushedSegment error", zap.Error(err))
continue continue
} }
if err = c.BuildIndex(segID, fieldSch, idxInfo, true); err != nil { for _, f := range coll.FieldIndexes {
log.Error("build index fail", zap.String("error", err.Error())) fieldSch, err := GetFieldSchemaByID(coll, f.FiledID)
} else { if err != nil {
log.Debug("build index", zap.String("index name", idxInfo.IndexName), log.Warn("field schema not found", zap.Int64("field id", f.FiledID))
zap.String("field name", fieldSch.Name), continue
zap.Int64("segment id", segID)) }
idxInfo, err := c.MetaTable.GetIndexByID(f.IndexID)
if err != nil {
log.Warn("index not found", zap.Int64("index id", f.IndexID))
continue
}
info := etcdpb.SegmentIndexInfo{
SegmentID: segID,
FieldID: fieldSch.FieldID,
IndexID: idxInfo.IndexID,
EnableIndex: false,
}
info.BuildID, err = c.BuildIndex(segID, fieldSch, idxInfo, true)
if err == nil {
info.EnableIndex = true
} else {
log.Error("build index fail", zap.String("error", err.Error()))
}
segIdxInfos = append(segIdxInfos, &info)
} }
} }
_, err = c.MetaTable.AddIndex(segIdxInfos, string(startPosByte), string(endPosByte))
if err != nil {
log.Error("AddIndex fail", zap.String("err", err.Error()))
}
} }
} }
} }
...@@ -583,45 +639,25 @@ func (c *Core) setMsgStreams() error { ...@@ -583,45 +639,25 @@ func (c *Core) setMsgStreams() error {
} }
}() }()
//segment channel, data service create segment,or data node flush segment will put msg in this channel
if Params.DataServiceSegmentChannel == "" { if Params.DataServiceSegmentChannel == "" {
return fmt.Errorf("DataServiceSegmentChannel is empty") return fmt.Errorf("DataServiceSegmentChannel is empty")
} }
dataServiceStream, _ := c.msFactory.NewMsgStream(c.ctx)
dataServiceStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName)
log.Debug("master AsConsumer: " + Params.DataServiceSegmentChannel + " : " + Params.MsgChannelSubName)
dataServiceStream.Start()
c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo, 1024)
c.DataNodeSegmentFlushCompletedChan = make(chan typeutil.UniqueID, 1024)
// receive segment info from msg stream // data service will put msg into this channel when create segment
go func() { dsStream, _ := c.msFactory.NewMsgStream(c.ctx)
for { dsSubName := Params.MsgChannelSubName + "ds"
select { dsStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dsSubName)
case <-c.ctx.Done(): log.Debug("master AsConsumer: " + Params.DataServiceSegmentChannel + " : " + dsSubName)
return dsStream.Start()
case segMsg, ok := <-dataServiceStream.Chan(): c.DataServiceSegmentChan = dsStream.Chan()
if !ok {
log.Warn("data service segment msg closed") // data node will put msg into this channel when flush segment
} dnStream, _ := c.msFactory.NewMsgStream(c.ctx)
if len(segMsg.Msgs) > 0 { dnSubName := Params.MsgChannelSubName + "dn"
for _, segm := range segMsg.Msgs { dnStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dnSubName)
segInfoMsg, ok := segm.(*ms.SegmentInfoMsg) log.Debug("master AsConsumer: " + Params.DataServiceSegmentChannel + " : " + dnSubName)
if ok { dnStream.Start()
c.DataServiceSegmentChan <- segInfoMsg.Segment c.DataNodeFlushedSegmentChan = dnStream.Chan()
} else {
flushMsg, ok := segm.(*ms.FlushCompletedMsg)
if ok {
c.DataNodeSegmentFlushCompletedChan <- flushMsg.SegmentFlushCompletedMsg.SegmentID
} else {
log.Debug("receive unexpected msg from data service stream", zap.Stringer("segment", segInfoMsg.SegmentMsg.Segment))
}
}
}
}
}
}
}()
return nil return nil
} }
...@@ -784,38 +820,31 @@ func (c *Core) SetQueryService(s types.QueryService) error { ...@@ -784,38 +820,31 @@ func (c *Core) SetQueryService(s types.QueryService) error {
} }
// BuildIndex will check row num and call build index service // BuildIndex will check row num and call build index service
func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, isFlush bool) error { func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, isFlush bool) (typeutil.UniqueID, error) {
if c.MetaTable.IsSegmentIndexed(segID, field, idxInfo.IndexParams) { if c.MetaTable.IsSegmentIndexed(segID, field, idxInfo.IndexParams) {
return nil return 0, nil
} }
rows, err := c.GetNumRowsReq(segID, isFlush) rows, err := c.GetNumRowsReq(segID, isFlush)
if err != nil { if err != nil {
return err return 0, err
} }
var bldID typeutil.UniqueID var bldID typeutil.UniqueID
enableIdx := false
if rows < Params.MinSegmentSizeToEnableIndex { if rows < Params.MinSegmentSizeToEnableIndex {
log.Debug("num of rows is less than MinSegmentSizeToEnableIndex", zap.Int64("num rows", rows)) log.Debug("num of rows is less than MinSegmentSizeToEnableIndex", zap.Int64("num rows", rows))
} else { } else {
binlogs, err := c.GetBinlogFilePathsFromDataServiceReq(segID, field.FieldID) binlogs, err := c.GetBinlogFilePathsFromDataServiceReq(segID, field.FieldID)
if err != nil { if err != nil {
return err return 0, err
} }
bldID, err = c.BuildIndexReq(c.ctx, binlogs, field, idxInfo) bldID, err = c.BuildIndexReq(c.ctx, binlogs, field, idxInfo)
if err != nil { if err != nil {
return err return 0, err
} }
enableIdx = true
}
seg := etcdpb.SegmentIndexInfo{
SegmentID: segID,
FieldID: field.FieldID,
IndexID: idxInfo.IndexID,
BuildID: bldID,
EnableIndex: enableIdx,
} }
_, err = c.MetaTable.AddIndex(&seg) log.Debug("build index", zap.String("index name", idxInfo.IndexName),
return err zap.String("field name", field.Name),
zap.Int64("segment id", segID))
return bldID, nil
} }
func (c *Core) Init() error { func (c *Core) Init() error {
...@@ -980,7 +1009,7 @@ func (c *Core) Start() error { ...@@ -980,7 +1009,7 @@ func (c *Core) Start() error {
go c.startDdScheduler() go c.startDdScheduler()
go c.startTimeTickLoop() go c.startTimeTickLoop()
go c.startDataServiceSegmentLoop() go c.startDataServiceSegmentLoop()
go c.startSegmentFlushCompletedLoop() go c.startDataNodeFlushedSegmentLoop()
go c.tsLoop() go c.tsLoop()
go c.chanTimeTick.StartWatch() go c.chanTimeTick.StartWatch()
c.stateCode.Store(internalpb.StateCode_Healthy) c.stateCode.Store(internalpb.StateCode_Healthy)
......
...@@ -1919,7 +1919,7 @@ func TestCheckInit(t *testing.T) { ...@@ -1919,7 +1919,7 @@ func TestCheckInit(t *testing.T) {
err = c.checkInit() err = c.checkInit()
assert.NotNil(t, err) assert.NotNil(t, err)
c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo) c.DataServiceSegmentChan = make(chan *msgstream.MsgPack)
err = c.checkInit() err = c.checkInit()
assert.NotNil(t, err) assert.NotNil(t, err)
...@@ -1953,7 +1953,7 @@ func TestCheckInit(t *testing.T) { ...@@ -1953,7 +1953,7 @@ func TestCheckInit(t *testing.T) {
err = c.checkInit() err = c.checkInit()
assert.NotNil(t, err) assert.NotNil(t, err)
c.DataNodeSegmentFlushCompletedChan = make(chan int64) c.DataNodeFlushedSegmentChan = make(chan *msgstream.MsgPack)
err = c.checkInit() err = c.checkInit()
assert.NotNil(t, err) assert.NotNil(t, err)
......
...@@ -41,6 +41,9 @@ const ( ...@@ -41,6 +41,9 @@ const (
TimestampPrefix = ComponentPrefix + "/timestamp" TimestampPrefix = ComponentPrefix + "/timestamp"
MsgStartPositionPrefix = ComponentPrefix + "/msg-start-position"
MsgEndPositionPrefix = ComponentPrefix + "/msg-end-position"
DDOperationPrefix = ComponentPrefix + "/dd-operation" DDOperationPrefix = ComponentPrefix + "/dd-operation"
DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send" DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send"
...@@ -689,45 +692,55 @@ func (mt *metaTable) GetPartitionByID(collID typeutil.UniqueID, partitionID type ...@@ -689,45 +692,55 @@ func (mt *metaTable) GetPartitionByID(collID typeutil.UniqueID, partitionID type
} }
func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) (typeutil.Timestamp, error) { func (mt *metaTable) AddSegment(segInfos []*datapb.SegmentInfo, msgStartPos string, msgEndPos string) (typeutil.Timestamp, error) {
mt.ddLock.Lock() mt.ddLock.Lock()
defer mt.ddLock.Unlock() defer mt.ddLock.Unlock()
collMeta, ok := mt.collID2Meta[seg.CollectionID]
if !ok { meta := make(map[string]string)
return 0, fmt.Errorf("can't find collection id = %d", seg.CollectionID) for _, segInfo := range segInfos {
} collMeta, ok := mt.collID2Meta[segInfo.CollectionID]
partMeta, ok := mt.partitionID2Meta[seg.PartitionID] if !ok {
if !ok { return 0, fmt.Errorf("can't find collection id = %d", segInfo.CollectionID)
return 0, fmt.Errorf("can't find partition id = %d", seg.PartitionID)
}
exist := false
for _, partID := range collMeta.PartitionIDs {
if partID == seg.PartitionID {
exist = true
break
} }
} partMeta, ok := mt.partitionID2Meta[segInfo.PartitionID]
if !exist { if !ok {
return 0, fmt.Errorf("partition id = %d, not belong to collection id = %d", seg.PartitionID, seg.CollectionID) return 0, fmt.Errorf("can't find partition id = %d", segInfo.PartitionID)
}
exist = false
for _, segID := range partMeta.SegmentIDs {
if segID == seg.ID {
exist = true
} }
exist := false
for _, partID := range collMeta.PartitionIDs {
if partID == segInfo.PartitionID {
exist = true
break
}
}
if !exist {
return 0, fmt.Errorf("partition id = %d, not belong to collection id = %d", segInfo.PartitionID, segInfo.CollectionID)
}
exist = false
for _, segID := range partMeta.SegmentIDs {
if segID == segInfo.ID {
exist = true
}
}
if exist {
return 0, fmt.Errorf("segment id = %d exist", segInfo.ID)
}
partMeta.SegmentIDs = append(partMeta.SegmentIDs, segInfo.ID)
mt.partitionID2Meta[segInfo.PartitionID] = partMeta
mt.segID2CollID[segInfo.ID] = segInfo.CollectionID
mt.segID2PartitionID[segInfo.ID] = segInfo.PartitionID
k := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, segInfo.CollectionID, segInfo.PartitionID)
v := proto.MarshalTextString(&partMeta)
meta[k] = v
} }
if exist {
return 0, fmt.Errorf("segment id = %d exist", seg.ID)
}
partMeta.SegmentIDs = append(partMeta.SegmentIDs, seg.ID)
mt.partitionID2Meta[seg.PartitionID] = partMeta
mt.segID2CollID[seg.ID] = seg.CollectionID
mt.segID2PartitionID[seg.ID] = seg.PartitionID
k := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, seg.CollectionID, seg.PartitionID)
v := proto.MarshalTextString(&partMeta)
ts, err := mt.client.Save(k, v) if msgStartPos != "" && msgEndPos != "" {
meta[MsgStartPositionPrefix] = msgStartPos
meta[MsgEndPositionPrefix] = msgEndPos
}
ts, err := mt.client.MultiSave(meta, nil)
if err != nil { if err != nil {
_ = mt.reloadFromKV() _ = mt.reloadFromKV()
return 0, err return 0, err
...@@ -735,62 +748,72 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) (typeutil.Timestamp, er ...@@ -735,62 +748,72 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) (typeutil.Timestamp, er
return ts, nil return ts, nil
} }
func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timestamp, error) { func (mt *metaTable) AddIndex(segIdxInfos []*pb.SegmentIndexInfo, msgStartPos string, msgEndPos string) (typeutil.Timestamp, error) {
mt.ddLock.Lock() mt.ddLock.Lock()
defer mt.ddLock.Unlock() defer mt.ddLock.Unlock()
collID, ok := mt.segID2CollID[segIdxInfo.SegmentID] meta := make(map[string]string)
if !ok {
return 0, fmt.Errorf("segment id = %d not belong to any collection", segIdxInfo.SegmentID) for _, segIdxInfo := range segIdxInfos {
} collID, ok := mt.segID2CollID[segIdxInfo.SegmentID]
collMeta, ok := mt.collID2Meta[collID] if !ok {
if !ok { return 0, fmt.Errorf("segment id = %d not belong to any collection", segIdxInfo.SegmentID)
return 0, fmt.Errorf("collection id = %d not found", collID) }
} collMeta, ok := mt.collID2Meta[collID]
partID, ok := mt.segID2PartitionID[segIdxInfo.SegmentID] if !ok {
if !ok { return 0, fmt.Errorf("collection id = %d not found", collID)
return 0, fmt.Errorf("segment id = %d not belong to any partition", segIdxInfo.SegmentID) }
} partID, ok := mt.segID2PartitionID[segIdxInfo.SegmentID]
exist := false if !ok {
for _, fidx := range collMeta.FieldIndexes { return 0, fmt.Errorf("segment id = %d not belong to any partition", segIdxInfo.SegmentID)
if fidx.IndexID == segIdxInfo.IndexID { }
exist = true exist := false
break for _, fidx := range collMeta.FieldIndexes {
if fidx.IndexID == segIdxInfo.IndexID {
exist = true
break
}
}
if !exist {
return 0, fmt.Errorf("index id = %d not found", segIdxInfo.IndexID)
} }
}
if !exist {
return 0, fmt.Errorf("index id = %d not found", segIdxInfo.IndexID)
}
segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID] segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID]
if !ok { if !ok {
idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{segIdxInfo.IndexID: *segIdxInfo} idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{segIdxInfo.IndexID: *segIdxInfo}
mt.segID2IndexMeta[segIdxInfo.SegmentID] = &idxMap mt.segID2IndexMeta[segIdxInfo.SegmentID] = &idxMap
} else { } else {
tmpInfo, ok := (*segIdxMap)[segIdxInfo.IndexID] tmpInfo, ok := (*segIdxMap)[segIdxInfo.IndexID]
if ok { if ok {
if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) { if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) {
log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID)) log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID))
return 0, nil return 0, nil
}
return 0, fmt.Errorf("index id = %d exist", segIdxInfo.IndexID)
} }
return 0, fmt.Errorf("index id = %d exist", segIdxInfo.IndexID)
} }
if _, ok := mt.flushedSegID[segIdxInfo.SegmentID]; !ok {
mt.flushedSegID[segIdxInfo.SegmentID] = true
}
(*(mt.segID2IndexMeta[segIdxInfo.SegmentID]))[segIdxInfo.IndexID] = *segIdxInfo
k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, collID, segIdxInfo.IndexID, partID, segIdxInfo.SegmentID)
v := proto.MarshalTextString(segIdxInfo)
meta[k] = v
} }
(*(mt.segID2IndexMeta[segIdxInfo.SegmentID]))[segIdxInfo.IndexID] = *segIdxInfo if msgStartPos != "" && msgEndPos != "" {
k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, collID, segIdxInfo.IndexID, partID, segIdxInfo.SegmentID) meta[MsgStartPositionPrefix] = msgStartPos
v := proto.MarshalTextString(segIdxInfo) meta[MsgEndPositionPrefix] = msgEndPos
}
ts, err := mt.client.Save(k, v) ts, err := mt.client.MultiSave(meta, nil)
if err != nil { if err != nil {
_ = mt.reloadFromKV() _ = mt.reloadFromKV()
return 0, err return 0, err
} }
if _, ok := mt.flushedSegID[segIdxInfo.SegmentID]; !ok {
mt.flushedSegID[segIdxInfo.SegmentID] = true
}
return ts, nil return ts, nil
} }
......
...@@ -301,29 +301,29 @@ func TestMetaTable(t *testing.T) { ...@@ -301,29 +301,29 @@ func TestMetaTable(t *testing.T) {
}) })
t.Run("add segment", func(t *testing.T) { t.Run("add segment", func(t *testing.T) {
seg := &datapb.SegmentInfo{ segInfo := &datapb.SegmentInfo{
ID: segID, ID: segID,
CollectionID: collID, CollectionID: collID,
PartitionID: partID, PartitionID: partID,
} }
_, err := mt.AddSegment(seg) _, err := mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "")
assert.Nil(t, err) assert.Nil(t, err)
_, err = mt.AddSegment(seg) _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "")
assert.NotNil(t, err) assert.NotNil(t, err)
seg.ID = segID2 segInfo.ID = segID2
seg.CollectionID = collIDInvalid segInfo.CollectionID = collIDInvalid
_, err = mt.AddSegment(seg) _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "")
assert.NotNil(t, err) assert.NotNil(t, err)
seg.CollectionID = collID segInfo.CollectionID = collID
seg.PartitionID = partIDInvalid segInfo.PartitionID = partIDInvalid
_, err = mt.AddSegment(seg) _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "")
assert.NotNil(t, err) assert.NotNil(t, err)
seg.PartitionID = partID segInfo.PartitionID = partID
_, err = mt.AddSegment(seg) _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "")
assert.Nil(t, err) assert.Nil(t, err)
}) })
...@@ -334,15 +334,15 @@ func TestMetaTable(t *testing.T) { ...@@ -334,15 +334,15 @@ func TestMetaTable(t *testing.T) {
IndexID: indexID, IndexID: indexID,
BuildID: buildID, BuildID: buildID,
} }
_, err := mt.AddIndex(&segIdxInfo) _, err := mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "")
assert.Nil(t, err) assert.Nil(t, err)
// it's legal to add index twice // it's legal to add index twice
_, err = mt.AddIndex(&segIdxInfo) _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "")
assert.Nil(t, err) assert.Nil(t, err)
segIdxInfo.BuildID = 202 segIdxInfo.BuildID = 202
_, err = mt.AddIndex(&segIdxInfo) _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "")
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("index id = %d exist", segIdxInfo.IndexID)) assert.EqualError(t, err, fmt.Sprintf("index id = %d exist", segIdxInfo.IndexID))
}) })
...@@ -529,12 +529,12 @@ func TestMetaTable(t *testing.T) { ...@@ -529,12 +529,12 @@ func TestMetaTable(t *testing.T) {
_, err := mt.AddCollection(collInfo, partInfo, idxInfo, nil) _, err := mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err) assert.Nil(t, err)
seg := &datapb.SegmentInfo{ segInfo := &datapb.SegmentInfo{
ID: 100, ID: 100,
CollectionID: collID, CollectionID: collID,
PartitionID: partID, PartitionID: partID,
} }
_, err = mt.AddSegment(seg) _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "")
assert.Nil(t, err) assert.Nil(t, err)
mt.collID2Meta = make(map[int64]pb.CollectionInfo) mt.collID2Meta = make(map[int64]pb.CollectionInfo)
...@@ -542,14 +542,14 @@ func TestMetaTable(t *testing.T) { ...@@ -542,14 +542,14 @@ func TestMetaTable(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("can't find collection: %s", collInfo.Schema.Name)) assert.EqualError(t, err, fmt.Sprintf("can't find collection: %s", collInfo.Schema.Name))
_, err = mt.GetCollectionBySegmentID(seg.ID) _, err = mt.GetCollectionBySegmentID(segInfo.ID)
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("can't find collection id: %d", collInfo.ID)) assert.EqualError(t, err, fmt.Sprintf("can't find collection id: %d", collInfo.ID))
mt.segID2CollID = make(map[int64]int64) mt.segID2CollID = make(map[int64]int64)
_, err = mt.GetCollectionBySegmentID(seg.ID) _, err = mt.GetCollectionBySegmentID(segInfo.ID)
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("segment id %d not belong to any collection", seg.ID)) assert.EqualError(t, err, fmt.Sprintf("segment id %d not belong to any collection", segInfo.ID))
}) })
t.Run("add partition failed", func(t *testing.T) { t.Run("add partition failed", func(t *testing.T) {
...@@ -686,24 +686,24 @@ func TestMetaTable(t *testing.T) { ...@@ -686,24 +686,24 @@ func TestMetaTable(t *testing.T) {
} }
mt.partitionID2Meta[noPart.PartitionID] = noPart mt.partitionID2Meta[noPart.PartitionID] = noPart
seg := &datapb.SegmentInfo{ segInfo := &datapb.SegmentInfo{
ID: 100, ID: 100,
CollectionID: collInfo.ID, CollectionID: collInfo.ID,
PartitionID: noPart.PartitionID, PartitionID: noPart.PartitionID,
} }
_, err = mt.AddSegment(seg) _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "")
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("partition id = %d, not belong to collection id = %d", seg.PartitionID, seg.CollectionID)) assert.EqualError(t, err, fmt.Sprintf("partition id = %d, not belong to collection id = %d", segInfo.PartitionID, segInfo.CollectionID))
seg = &datapb.SegmentInfo{ segInfo = &datapb.SegmentInfo{
ID: 11, ID: 11,
CollectionID: collInfo.ID, CollectionID: collInfo.ID,
PartitionID: partInfo.PartitionID, PartitionID: partInfo.PartitionID,
} }
mockKV.save = func(key, value string) (typeutil.Timestamp, error) { mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("save error") return 0, fmt.Errorf("save error")
} }
_, err = mt.AddSegment(seg) _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "")
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, "save error") assert.EqualError(t, err, "save error")
}) })
...@@ -725,36 +725,36 @@ func TestMetaTable(t *testing.T) { ...@@ -725,36 +725,36 @@ func TestMetaTable(t *testing.T) {
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err) assert.Nil(t, err)
seg := &datapb.SegmentInfo{ segInfo := &datapb.SegmentInfo{
ID: 100, ID: 100,
CollectionID: collID, CollectionID: collID,
PartitionID: partID, PartitionID: partID,
} }
_, err = mt.AddSegment(seg) _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "")
assert.Nil(t, err) assert.Nil(t, err)
segIdxInfo := &pb.SegmentIndexInfo{ segIdxInfo := pb.SegmentIndexInfo{
SegmentID: segID, SegmentID: segID,
FieldID: fieldID, FieldID: fieldID,
IndexID: indexID2, IndexID: indexID2,
BuildID: buildID, BuildID: buildID,
} }
_, err = mt.AddIndex(segIdxInfo) _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "")
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", segIdxInfo.IndexID)) assert.EqualError(t, err, fmt.Sprintf("index id = %d not found", segIdxInfo.IndexID))
mt.segID2PartitionID = make(map[int64]int64) mt.segID2PartitionID = make(map[int64]int64)
_, err = mt.AddIndex(segIdxInfo) _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "")
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any partition", segIdxInfo.SegmentID)) assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any partition", segIdxInfo.SegmentID))
mt.collID2Meta = make(map[int64]pb.CollectionInfo) mt.collID2Meta = make(map[int64]pb.CollectionInfo)
_, err = mt.AddIndex(segIdxInfo) _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "")
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("collection id = %d not found", collInfo.ID)) assert.EqualError(t, err, fmt.Sprintf("collection id = %d not found", collInfo.ID))
mt.segID2CollID = make(map[int64]int64) mt.segID2CollID = make(map[int64]int64)
_, err = mt.AddIndex(segIdxInfo) _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "")
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any collection", segIdxInfo.SegmentID)) assert.EqualError(t, err, fmt.Sprintf("segment id = %d not belong to any collection", segIdxInfo.SegmentID))
...@@ -764,14 +764,14 @@ func TestMetaTable(t *testing.T) { ...@@ -764,14 +764,14 @@ func TestMetaTable(t *testing.T) {
collInfo.PartitionIDs = nil collInfo.PartitionIDs = nil
_, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil) _, err = mt.AddCollection(collInfo, partInfo, idxInfo, nil)
assert.Nil(t, err) assert.Nil(t, err)
_, err = mt.AddSegment(seg) _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "")
assert.Nil(t, err) assert.Nil(t, err)
segIdxInfo.IndexID = indexID segIdxInfo.IndexID = indexID
mockKV.save = func(key, value string) (typeutil.Timestamp, error) { mockKV.multiSave = func(kvs map[string]string, addition func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("save error") return 0, fmt.Errorf("save error")
} }
_, err = mt.AddIndex(segIdxInfo) _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "")
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, "save error") assert.EqualError(t, err, "save error")
}) })
...@@ -872,27 +872,27 @@ func TestMetaTable(t *testing.T) { ...@@ -872,27 +872,27 @@ func TestMetaTable(t *testing.T) {
CollectionID: collID, CollectionID: collID,
PartitionID: partID, PartitionID: partID,
} }
_, err = mt.AddSegment(segInfo) _, err = mt.AddSegment([]*datapb.SegmentInfo{segInfo}, "", "")
assert.Nil(t, err) assert.Nil(t, err)
segIdx := &pb.SegmentIndexInfo{ segIdxInfo := pb.SegmentIndexInfo{
SegmentID: segID, SegmentID: segID,
FieldID: fieldID, FieldID: fieldID,
IndexID: indexID, IndexID: indexID,
BuildID: buildID, BuildID: buildID,
} }
_, err = mt.AddIndex(segIdx) _, err = mt.AddIndex([]*pb.SegmentIndexInfo{&segIdxInfo}, "", "")
assert.Nil(t, err) assert.Nil(t, err)
idx, err := mt.GetSegmentIndexInfoByID(segIdx.SegmentID, segIdx.FieldID, idxInfo[0].IndexName) idx, err := mt.GetSegmentIndexInfoByID(segIdxInfo.SegmentID, segIdxInfo.FieldID, idxInfo[0].IndexName)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, segIdx.IndexID, idx.IndexID) assert.Equal(t, segIdxInfo.IndexID, idx.IndexID)
_, err = mt.GetSegmentIndexInfoByID(segIdx.SegmentID, segIdx.FieldID, "abc") _, err = mt.GetSegmentIndexInfoByID(segIdxInfo.SegmentID, segIdxInfo.FieldID, "abc")
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("can't find index name = abc on segment = %d, with filed id = %d", segIdx.SegmentID, segIdx.FieldID)) assert.EqualError(t, err, fmt.Sprintf("can't find index name = abc on segment = %d, with filed id = %d", segIdxInfo.SegmentID, segIdxInfo.FieldID))
_, err = mt.GetSegmentIndexInfoByID(segIdx.SegmentID, 11, idxInfo[0].IndexName) _, err = mt.GetSegmentIndexInfoByID(segIdxInfo.SegmentID, 11, idxInfo[0].IndexName)
assert.NotNil(t, err) assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("can't find index name = %s on segment = %d, with filed id = 11", idxInfo[0].IndexName, segIdx.SegmentID)) assert.EqualError(t, err, fmt.Sprintf("can't find index name = %s on segment = %d, with filed id = 11", idxInfo[0].IndexName, segIdxInfo.SegmentID))
}) })
t.Run("get field schema failed", func(t *testing.T) { t.Run("get field schema failed", func(t *testing.T) {
......
...@@ -746,15 +746,24 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error { ...@@ -746,15 +746,24 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
if field.DataType != schemapb.DataType_FloatVector && field.DataType != schemapb.DataType_BinaryVector { if field.DataType != schemapb.DataType_FloatVector && field.DataType != schemapb.DataType_BinaryVector {
return fmt.Errorf("field name = %s, data type = %s", t.Req.FieldName, schemapb.DataType_name[int32(field.DataType)]) return fmt.Errorf("field name = %s, data type = %s", t.Req.FieldName, schemapb.DataType_name[int32(field.DataType)])
} }
var segIdxInfos []*etcdpb.SegmentIndexInfo
for _, segID := range segIDs { for _, segID := range segIDs {
if err := t.core.BuildIndex(segID, &field, idxInfo, false); err != nil { info := etcdpb.SegmentIndexInfo{
SegmentID: segID,
FieldID: field.FieldID,
IndexID: idxInfo.IndexID,
EnableIndex: false,
}
info.BuildID, err = t.core.BuildIndex(segID, &field, idxInfo, false)
if err != nil {
return err return err
} }
log.Debug("build index", zap.String("index name", idxInfo.IndexName), segIdxInfos = append(segIdxInfos, &info)
zap.String("field name", field.Name),
zap.Int64("segment id", segID))
} }
return nil
_, err = t.core.MetaTable.AddIndex(segIdxInfos, "", "")
return err
} }
type DescribeIndexReqTask struct { type DescribeIndexReqTask struct {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册