提交 6c02ae4e 编写于 作者: X XuanYang-cn 提交者: zhenshan.cao

Remove Old msgposition logic (#5487)

Signed-off-by: Nyangxuan <xuan.yang@zilliz.com>
上级 c213dc46
......@@ -36,16 +36,12 @@ type Replica interface {
addSegment(segmentID UniqueID, collID UniqueID, partitionID UniqueID, channelName string) error
removeSegment(segmentID UniqueID) error
hasSegment(segmentID UniqueID) bool
setIsFlushed(segmentID UniqueID) error
setStartPosition(segmentID UniqueID, startPos *internalpb.MsgPosition) error
setEndPosition(segmentID UniqueID, endPos *internalpb.MsgPosition) error
updateStatistics(segmentID UniqueID, numRows int64) error
getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
getSegmentByID(segmentID UniqueID) (*Segment, error)
bufferAutoFlushBinlogPaths(segmentID UniqueID, field2Path map[UniqueID]string) error
getBufferPaths(segID UniqueID) (map[UniqueID][]string, error)
getChannelName(segID UniqueID) (string, error)
//new msg postions
setStartPositions(segmentID UniqueID, startPos []*internalpb.MsgPosition) error
setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error
getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition)
......@@ -59,14 +55,8 @@ type Segment struct {
numRows int64
memorySize int64
isNew atomic.Value // bool
isFlushed bool
createTime Timestamp // not using
endTime Timestamp // not using
startPosition *internalpb.MsgPosition
endPosition *internalpb.MsgPosition // not using
channelName string
field2Paths map[UniqueID][]string // fieldID to binlog paths, only auto-flushed paths will be buffered.
channelName string
field2Paths map[UniqueID][]string // fieldID to binlog paths, only auto-flushed paths will be buffered.
// CollectionSegmentReplica is the data replication of persistent data in datanode.
......@@ -157,8 +147,6 @@ func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Se
// `addSegment` add a new segment into replica when data node see the segment
// for the first time in insert channels. It sets the startPosition of a segment, and
// flags `isNew=true`
func (replica *CollectionSegmentReplica) addSegment(
segmentID UniqueID,
collID UniqueID,
......@@ -169,20 +157,12 @@ func (replica *CollectionSegmentReplica) addSegment(
defer replica.mu.Unlock()
log.Debug("Add Segment", zap.Int64("Segment ID", segmentID))
position := &internalpb.MsgPosition{
ChannelName: channelName,
seg := &Segment{
segmentID: segmentID,
collectionID: collID,
partitionID: partitionID,
isFlushed: false,
createTime: 0,
startPosition: position,
endPosition: new(internalpb.MsgPosition),
channelName: channelName,
field2Paths: make(map[UniqueID][]string),
segmentID: segmentID,
collectionID: collID,
partitionID: partitionID,
channelName: channelName,
field2Paths: make(map[UniqueID][]string),
......@@ -208,48 +188,6 @@ func (replica *CollectionSegmentReplica) hasSegment(segmentID UniqueID) bool {
return ok
func (replica *CollectionSegmentReplica) setIsFlushed(segmentID UniqueID) error {
defer replica.mu.RUnlock()
if seg, ok := replica.segments[segmentID]; ok {
seg.isFlushed = true
return nil
return fmt.Errorf("There's no segment %v", segmentID)
func (replica *CollectionSegmentReplica) setStartPosition(segmentID UniqueID, startPos *internalpb.MsgPosition) error {
defer replica.mu.RUnlock()
if startPos == nil {
return fmt.Errorf("Nil MsgPosition")
if seg, ok := replica.segments[segmentID]; ok {
seg.startPosition = startPos
return nil
return fmt.Errorf("There's no segment %v", segmentID)
func (replica *CollectionSegmentReplica) setEndPosition(segmentID UniqueID, endPos *internalpb.MsgPosition) error {
defer replica.mu.RUnlock()
if endPos == nil {
return fmt.Errorf("Nil MsgPosition")
if seg, ok := replica.segments[segmentID]; ok {
seg.endPosition = endPos
return nil
return fmt.Errorf("There's no segment %v", segmentID)
// `updateStatistics` updates the number of rows of a segment in replica.
func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error {
......@@ -266,8 +204,6 @@ func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, nu
// `getSegmentStatisticsUpdates` gives current segment's statistics updates.
// if the segment's flag `isNew` is true, updates will contain a valid start position.
// if the segment's flag `isFlushed` is true, updates will contain a valid end position.
func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) {
defer replica.mu.Unlock()
......@@ -279,15 +215,6 @@ func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID U
NumRows: seg.numRows,
if seg.isNew.Load() == true {
updates.StartPosition = seg.startPosition
if seg.isFlushed {
updates.EndPosition = seg.endPosition
return updates, nil
return nil, fmt.Errorf("Error, there's no segment %v", segmentID)
......@@ -14,7 +14,6 @@ package datanode
import (
......@@ -135,8 +134,6 @@ func TestReplica_Segment(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, UniqueID(0), update.SegmentID)
assert.Equal(t, int64(100), update.NumRows)
assert.NotNil(t, update.StartPosition)
assert.Nil(t, update.EndPosition)
f2p := map[UniqueID]string{
1: "a",
......@@ -156,16 +153,8 @@ func TestReplica_Segment(t *testing.T) {
assert.ElementsMatch(t, []string{"a", "a"}, r[1])
assert.ElementsMatch(t, []string{"b", "b"}, r[2])
err = replica.setIsFlushed(0)
assert.NoError(t, err)
err = replica.setStartPosition(0, &internalpb.MsgPosition{})
assert.NoError(t, err)
err = replica.setEndPosition(0, &internalpb.MsgPosition{})
assert.NoError(t, err)
update, err = replica.getSegmentStatisticsUpdates(0)
assert.NoError(t, err)
assert.Nil(t, update.StartPosition)
assert.NotNil(t, update.EndPosition)
err = replica.removeSegment(0)
assert.NoError(t, err)
......@@ -180,17 +169,6 @@ func TestReplica_Segment(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, seg)
err = replica.setIsFlushed(0)
assert.Error(t, err)
err = replica.setStartPosition(0, &internalpb.MsgPosition{})
assert.Error(t, err)
err = replica.setStartPosition(0, nil)
assert.Error(t, err)
err = replica.setEndPosition(0, &internalpb.MsgPosition{})
assert.Error(t, err)
err = replica.setEndPosition(0, nil)
assert.Error(t, err)
err = replica.updateStatistics(0, 0)
assert.Error(t, err)
......@@ -141,7 +141,6 @@ func (node *DataNode) Init() error {
node.session = sessionutil.NewSession(ctx, Params.MetaRootPath, []string{Params.EtcdAddress})
node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
// TODO find DataService & MasterService
req := &datapb.RegisterNodeRequest{
Base: &commonpb.MsgBase{
SourceID: node.NodeID,
......@@ -195,12 +194,13 @@ func (node *DataNode) NewDataSyncService(vchanPair *datapb.VchannelPair) error {
replica := newReplica()
var alloc allocatorInterface = newAllocator(node.masterService)
metaService := newMetaService(node.ctx, replica, node.masterService)
flushChan := make(chan *flushMsg, 100)
dataSyncService := newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory, vchanPair)
// TODO metaService using timestamp in DescribeCollection
metaService := newMetaService(node.ctx, replica, node.masterService)
node.vchan2SyncService[vchanPair.GetDmlVchannelName()] = dataSyncService
node.vchan2FlushCh[vchanPair.GetDmlVchannelName()] = flushChan
......@@ -246,6 +246,7 @@ func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmCha
// GetComponentStates will return current state of DataNode
func (node *DataNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
log.Debug("DataNode current state", zap.Any("State", node.State.Load()))
states := &internalpb.ComponentStates{
......@@ -271,29 +272,73 @@ func (node *DataNode) getChannelName(segID UniqueID) string {
return ""
// ReadyToFlush tells wether DataNode is ready for flushing
func (node *DataNode) ReadyToFlush() error {
if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy {
return errors.New("DataNode not in HEALTHY state")
defer node.chanMut.RUnlock()
if len(node.vchan2SyncService) == 0 && len(node.vchan2FlushCh) == 0 {
// Healthy but Idle
msg := "DataNode HEALTHY but IDLE, please try WatchDmChannels to make it work"
return errors.New(msg)
if len(node.vchan2SyncService) != len(node.vchan2FlushCh) {
// TODO restart
msg := "DataNode HEALTHY but abnormal inside, restarting..."
return errors.New(msg)
return nil
func (node *DataNode) getSegmentPositionPair(segmentID UniqueID, chanName string) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) {
defer node.chanMut.Unlock()
sync, ok := node.vchan2SyncService[chanName]
if !ok {
return nil, nil
starts, ends := sync.replica.getSegmentPositions(segmentID)
return starts, ends
// FlushSegments packs flush messages into flowgraph through flushChan.
// If DataNode receives a valid segment to flush, new flush message for the segment should be ignored.
// So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed.
// There are 1 precondition: The segmentID in req is in ascending order.
func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
log.Debug("FlushSegments ...", zap.Int("num", len(req.SegmentIDs)))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
if err := node.ReadyToFlush(); err != nil {
status.Reason = err.Error()
return status, nil
log.Debug("FlushSegments ...", zap.Int("num", len(req.SegmentIDs)))
for _, id := range req.SegmentIDs {
chanName := node.getChannelName(id)
log.Info("vchannel", zap.String("name", chanName))
if chanName == "" {
if len(chanName) == 0 {
status.Reason = fmt.Sprintf("DataNode not find segment %d!", id)
return status, errors.New(status.GetReason())
flushCh, ok := node.vchan2FlushCh[chanName]
if !ok {
// TODO restart DataNode or reshape vchan2FlushCh and vchan2SyncService
status.Reason = "DataNode abnormal!"
return status, errors.New(status.GetReason())
status.Reason = "DataNode abnormal, restarting"
return status, nil
ddlFlushedCh := make(chan []*datapb.DDLBinlogMeta)
......@@ -310,7 +355,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
waitReceive := func(wg *sync.WaitGroup, flushedCh interface{}, req *datapb.SaveBinlogPathsRequest) {
defer wg.Done()
log.Info("Inside waitReceive")
log.Debug("Inside waitReceive")
switch Ch := flushedCh.(type) {
case chan []*datapb.ID2PathList:
select {
......@@ -324,6 +369,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
// Modify req with valid dml binlog paths
req.Field2BinlogPaths = meta
log.Info("Insert messeges flush done!", zap.Any("Binlog paths", meta))
......@@ -345,6 +391,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
// Modify req with valid ddl binlog paths
req.DdlBinlogPaths = meta
log.Info("Ddl messages flush done!", zap.Any("Binlog paths", meta))
......@@ -352,18 +399,19 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
// TODO make a queue for this func
currentSegID := id
req := &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{},
SegmentID: id,
CollectionID: req.CollectionID,
// TODO Set start_positions and end_positions
log.Info("Waiting for flush completed", zap.Int64("segmentID", id))
go func() {
flushCh <- flushmsg
log.Info("Waiting for flush completed", zap.Int64("segmentID", currentSegID))
req := &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{},
SegmentID: currentSegID,
CollectionID: req.CollectionID,
var wg sync.WaitGroup
go waitReceive(&wg, ddlFlushedCh, req)
......@@ -371,6 +419,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
go waitReceive(&wg, dmlFlushedCh, req)
log.Info("Notify DataService BinlogPaths and Positions")
status, err := node.dataService.SaveBinlogPaths(node.ctx, req)
if err != nil {
log.Error("DataNode or DataService abnormal, restarting DataNode")
......@@ -385,7 +434,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
log.Info("Flush Completed", zap.Int64("segmentID", currentSegID))
......@@ -135,7 +135,7 @@ func TestDataNode(t *testing.T) {
SegmentIDs: []int64{0},
status, err := node1.FlushSegments(node.ctx, req)
status, err := node1.FlushSegments(node1.ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
......@@ -183,11 +183,16 @@ func TestDataNode(t *testing.T) {
err = ddMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
err = ddMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
_, err = sync.replica.getSegmentByID(0)
assert.NoError(t, err)
defer func() {
......@@ -139,28 +139,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
log.Error("add segment wrong", zap.Error(err))
switch {
case iMsg.startPositions == nil || len(iMsg.startPositions) <= 0:
log.Error("insert Msg StartPosition empty")
segment, err := ibNode.replica.getSegmentByID(currentSegID)
if err != nil {
log.Error("get segment wrong", zap.Error(err))
var startPosition *internalpb.MsgPosition = nil
for _, pos := range iMsg.startPositions {
if pos.ChannelName == segment.channelName {
startPosition = pos
if startPosition == nil {
log.Error("get position wrong", zap.Error(err))
} else {
ibNode.replica.setStartPosition(currentSegID, startPosition)
// set msg pack start positions, new design
// set msg pack start positions
ibNode.replica.setStartPositions(currentSegID, iMsg.startPositions)
......@@ -461,27 +440,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// 1.3 store in buffer
ibNode.insertBuffer.insertData[currentSegID] = idata
switch {
case iMsg.endPositions == nil || len(iMsg.endPositions) <= 0:
log.Error("insert Msg EndPosition empty")
segment, err := ibNode.replica.getSegmentByID(currentSegID)
if err != nil {
log.Error("get segment wrong", zap.Error(err))
var endPosition *internalpb.MsgPosition = nil
for _, pos := range iMsg.endPositions {
if pos.ChannelName == segment.channelName {
endPosition = pos
if endPosition == nil {
log.Error("get position wrong", zap.Error(err))
ibNode.replica.setEndPosition(currentSegID, endPosition)
// store current startPositions as Segment->EndPostion
// store current endPositions as Segment->EndPostion
ibNode.replica.setEndPositions(currentSegID, iMsg.endPositions)
......@@ -717,7 +676,6 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID, wait <-chan map[Un
dmlFlushedCh <- binlogPaths
log.Debug(".. Segment flush completed ..")
......@@ -870,8 +828,7 @@ func newInsertBufferNode(ctx context.Context, replica Replica, factory msgstream
segmentStatisticsStream: segStatisticsMsgStream,
completeFlushStream: completeFlushStream,
replica: replica,
// flushMeta: flushMeta,
flushMap: sync.Map{},
idAllocator: idAllocator,
flushMap: sync.Map{},
idAllocator: idAllocator,
......@@ -110,6 +110,7 @@ func newHEALTHDataNodeMock(dmChannelName, ddChannelName string) *DataNode {
DdlPosition: &datapb.PositionPair{},
DmlPosition: &datapb.PositionPair{},
_ = node.NewDataSyncService(vpair)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册