提交 91f5c480 编写于 作者: X XuanYang-cn 提交者: yefu.chen

Fix datanode goroutine leak

Signed-off-by: NXuanYang-cn <xuan.yang@zilliz.com>
上级 721cdb2f
......@@ -40,7 +40,7 @@ type DataNode struct {
masterService types.MasterService
dataService types.DataService
flushChan chan *flushMsg
flushChan chan<- *flushMsg
replica Replica
closer io.Closer
......@@ -135,9 +135,10 @@ func (node *DataNode) Init() error {
var alloc allocatorInterface = newAllocator(node.masterService)
chanSize := 100
node.flushChan = make(chan *flushMsg, chanSize)
flushChan := make(chan *flushMsg, chanSize)
node.flushChan = flushChan
node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc, node.msFactory)
node.dataSyncService = newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory)
node.dataSyncService.init()
node.metaService = newMetaService(node.ctx, replica, node.masterService)
......
......@@ -237,11 +237,11 @@ func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionNa
}
collection := etcdpb.CollectionMeta{
ID: collectionID,
Schema: &sch,
CreateTime: Timestamp(1),
SegmentIDs: make([]UniqueID, 0),
PartitionTags: make([]string, 0),
ID: collectionID,
Schema: &sch,
CreateTime: Timestamp(1),
SegmentIDs: make([]UniqueID, 0),
PartitionIDs: []UniqueID{0},
}
return &collection
}
......
......@@ -17,13 +17,13 @@ import (
type dataSyncService struct {
ctx context.Context
fg *flowgraph.TimeTickedFlowGraph
flushChan chan *flushMsg
flushChan <-chan *flushMsg
replica Replica
idAllocator allocatorInterface
msFactory msgstream.Factory
}
func newDataSyncService(ctx context.Context, flushChan chan *flushMsg,
func newDataSyncService(ctx context.Context, flushChan <-chan *flushMsg,
replica Replica, alloc allocatorInterface, factory msgstream.Factory) *dataSyncService {
service := &dataSyncService{
ctx: ctx,
......
......@@ -29,7 +29,7 @@ type ddNode struct {
ddRecords *ddRecords
ddBuffer *ddBuffer
flushMap *sync.Map
inFlushCh chan *flushMsg
inFlushCh <-chan *flushMsg
kv kv.Base
replica Replica
......@@ -429,7 +429,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
}
func newDDNode(ctx context.Context, binlogMeta *binlogMeta,
inFlushCh chan *flushMsg, replica Replica) *ddNode {
inFlushCh <-chan *flushMsg, replica Replica) *ddNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
......
......@@ -137,6 +137,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
for _, pos := range iMsg.startPositions {
if pos.ChannelName == segment.channelName {
startPosition = pos
break
}
}
if startPosition == nil {
......@@ -457,6 +458,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if ibNode.insertBuffer.full(currentSegID) {
log.Debug(". Insert Buffer full, auto flushing ",
zap.Int32("num of rows", ibNode.insertBuffer.size(currentSegID)))
collSch, err := ibNode.getCollectionSchemaByID(collection.GetID())
if err != nil {
log.Error("Auto flush failed .. cannot get collection schema ..", zap.Error(err))
......@@ -516,16 +518,23 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
log.Debug(".. Buffer not empty, flushing ..")
ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID])
delete(ibNode.insertBuffer.insertData, currentSegID)
clearFn := func() {
finishCh <- false
log.Debug(".. Clearing flush Buffer ..")
ibNode.flushMap.Delete(currentSegID)
}
seg, err := ibNode.replica.getSegmentByID(currentSegID)
if err != nil {
log.Error("Flush failed .. cannot get segment ..", zap.Error(err))
clearFn()
continue
}
collSch, err := ibNode.getCollectionSchemaByID(seg.collectionID)
if err != nil {
log.Error("Flush failed .. cannot get collection schema ..", zap.Error(err))
clearFn()
continue
}
......@@ -557,10 +566,11 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionID UniqueID, collID UniqueID,
insertData *sync.Map, meta *binlogMeta, kv kv.Base, finishCh chan<- bool) {
defer func() {
clearFn := func(isSuccess bool) {
finishCh <- isSuccess
log.Debug(".. Clearing flush Buffer ..")
insertData.Delete(segID)
}()
}
inCodec := storage.NewInsertCodec(collMeta)
......@@ -568,14 +578,14 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI
data, ok := insertData.Load(segID)
if !ok {
log.Error("Flush failed ... cannot load insertData ..")
finishCh <- false
clearFn(false)
return
}
binLogs, err := inCodec.Serialize(partitionID, segID, data.(*InsertData))
if err != nil {
log.Error("Flush failed ... cannot generate binlog ..", zap.Error(err))
finishCh <- false
clearFn(false)
return
}
......@@ -587,14 +597,14 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
if err != nil {
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
finishCh <- false
clearFn(false)
return
}
k, err := meta.genKey(true, collID, partitionID, segID, fieldID)
if err != nil {
log.Error("Flush failed ... cannot alloc ID ..", zap.Error(err))
finishCh <- false
clearFn(false)
return
}
......@@ -608,7 +618,7 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI
if err != nil {
log.Error("Flush failed ... cannot save to MinIO ..", zap.Error(err))
_ = kv.MultiRemove(paths)
finishCh <- false
clearFn(false)
return
}
......@@ -617,12 +627,11 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI
if err != nil {
log.Error("Flush failed ... cannot save binlog paths ..", zap.Error(err))
_ = kv.MultiRemove(paths)
finishCh <- false
clearFn(false)
return
}
finishCh <- true
clearFn(true)
}
func (ibNode *insertBufferNode) completeFlush(segID UniqueID, finishCh <-chan bool) {
......
......@@ -40,6 +40,8 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
replica := newReplica()
err = replica.addCollection(collMeta.ID, collMeta.Schema)
require.NoError(t, err)
err = replica.addSegment(1, collMeta.ID, 0, Params.InsertChannelNames[0])
require.NoError(t, err)
msFactory := pulsarms.NewFactory()
m := map[string]interface{}{
......@@ -64,7 +66,7 @@ func genInsertMsg() insertMsg {
startPos := []*internalpb.MsgPosition{
{
ChannelName: "aaa",
ChannelName: Params.InsertChannelNames[0],
MsgID: make([]byte, 0),
Timestamp: 0,
},
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册