未验证 提交 10a6f7f1 编写于 作者: C congqixia 提交者: GitHub

Defer buf removal from insertBuf after SaveBinlog succeeds (#8930)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 9cbf157a
...@@ -261,7 +261,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { ...@@ -261,7 +261,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
} }
} }
// Manul Flush // Manual Flush
select { select {
case fmsg := <-ibNode.flushChan: case fmsg := <-ibNode.flushChan:
currentSegID := fmsg.segmentID currentSegID := fmsg.segmentID
...@@ -287,7 +287,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { ...@@ -287,7 +287,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
finishCh := make(chan segmentFlushUnit, 1) finishCh := make(chan segmentFlushUnit, 1)
// Since buffer is not empty, so there must be data for key currentSegID // Since buffer is not empty, so there must be data for key currentSegID
bd, _ := ibNode.insertBuffer.LoadAndDelete(currentSegID) bd, _ := ibNode.insertBuffer.Load(currentSegID)
ibNode.flushMap.Store(currentSegID, bd.(*BufferData).buffer) ibNode.flushMap.Store(currentSegID, bd.(*BufferData).buffer)
clearFn := func() { clearFn := func() {
...@@ -324,9 +324,11 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { ...@@ -324,9 +324,11 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
log.Debug("Data service save binlog path failed", zap.Error(err)) log.Debug("Data service save binlog path failed", zap.Error(err))
} else { } else {
ibNode.replica.segmentFlushed(fu.segID) ibNode.replica.segmentFlushed(fu.segID)
ibNode.flushingSegCache.Remove(fu.segID) ibNode.insertBuffer.Delete(fu.segID)
} }
} }
//always remove from flushing seg cache
ibNode.flushingSegCache.Remove(fu.segID)
} }
default: default:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册