提交 6ef82a59 编写于 作者: D dragondriver 提交者: yefu.chen

Fix the bug when close the message stream of InsertTask

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 0f620ab1
......@@ -177,8 +177,10 @@ func (m *InsertChannelsMap) closeAllMsgStream() {
m.mtx.Lock()
defer m.mtx.Unlock()
for _, stream := range m.insertMsgStreams {
stream.Close()
for loc, stream := range m.insertMsgStreams {
if m.droppedBitMap[loc] == 0 && m.usageHistogram[loc] >= 1 {
stream.Close()
}
}
m.collectionID2InsertChannels = make(map[UniqueID]int)
......
......@@ -24,7 +24,7 @@ type NaiveNodeIDAllocatorImpl struct {
func (allocator *NaiveNodeIDAllocatorImpl) AllocOne() UniqueID {
allocator.mtx.Lock()
defer func() {
allocator.now++
// allocator.now++
allocator.mtx.Unlock()
}()
return allocator.now
......@@ -32,6 +32,6 @@ func (allocator *NaiveNodeIDAllocatorImpl) AllocOne() UniqueID {
func NewNodeIDAllocator() NodeIDAllocator {
return &NaiveNodeIDAllocatorImpl{
now: 0,
now: 1,
}
}
......@@ -83,8 +83,8 @@ func (ttBarrier *softTimeTickBarrier) Start() error {
case ttmsgs := <-ttBarrier.ttStream.Chan():
//log.Println("ttmsgs: ", ttmsgs)
ttBarrier.peerMtx.RLock()
log.Println("peer2LastTt map: ", ttBarrier.peer2LastTt)
log.Println("len(ttmsgs.Msgs): ", len(ttmsgs.Msgs))
//log.Println("peer2LastTt map: ", ttBarrier.peer2LastTt)
//log.Println("len(ttmsgs.Msgs): ", len(ttmsgs.Msgs))
if len(ttmsgs.Msgs) > 0 {
for _, timetickmsg := range ttmsgs.Msgs {
ttmsg := timetickmsg.(*ms.TimeTickMsg)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册