未验证 提交 e8138676 编写于 作者: C congqixia 提交者: GitHub

Implement Drop mode for flush manager (#12469)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 7ff15490
......@@ -561,4 +561,8 @@ func (mfm *mockFlushManager) injectFlush(injection *taskInjection, segments ...U
}()
}
func (mfm *mockFlushManager) notifyAllFlushed() {}
func (mfm *mockFlushManager) startDropping() {}
func (mfm *mockFlushManager) close() {}
......@@ -146,7 +146,8 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
}
// initialize flush manager for DataSync Service
dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.blobKV, dsService.replica, flushNotifyFunc(dsService))
dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.blobKV, dsService.replica,
flushNotifyFunc(dsService), dropVirtualChannelFunc(dsService))
// recover segment checkpoints
for _, us := range vchanInfo.GetUnflushedSegments() {
......
......@@ -205,7 +205,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
)
replica := genMockReplica(segIDs, pks, chanName)
kv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(NewAllocatorFactory(), kv, replica, func(*segmentFlushPack) {})
fm := NewRendezvousFlushManager(NewAllocatorFactory(), kv, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
t.Run("Test get segment by primary keys", func(te *testing.T) {
c := &nodeConfig{
replica: replica,
......
......@@ -83,7 +83,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) {})
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
flushChan := make(chan flushMsg, 100)
......@@ -180,7 +180,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, replica, func(*segmentFlushPack) {})
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
flushChan := make(chan flushMsg, 100)
c := &nodeConfig{
......@@ -395,7 +395,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
colRep.segmentFlushed(pack.segmentID)
}
wg.Done()
})
}, emptyFlushAndDropFunc)
flushChan := make(chan flushMsg, 100)
c := &nodeConfig{
......@@ -656,7 +656,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) {})
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
flushChan := make(chan flushMsg, 100)
c := &nodeConfig{
......
......@@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/retry"
"go.uber.org/atomic"
"go.uber.org/zap"
)
......@@ -42,6 +43,10 @@ type flushManager interface {
flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error
// injectFlush injects compaction or other blocking task before flush sync
injectFlush(injection *taskInjection, segments ...UniqueID)
// startDropping changes flush manager into dropping mode
startDropping()
// notifyAllFlushed tells flush manager there is not future incoming flush task for drop mode
notifyAllFlushed()
// close handles resource clean up
close()
}
......@@ -61,6 +66,9 @@ type segmentFlushPack struct {
// notifyMetaFunc notify meta to persistent flush result
type notifyMetaFunc func(*segmentFlushPack)
// flushAndDropFunc notifies meta to flush current state and drop virtual channel
type flushAndDropFunc func([]*segmentFlushPack)
// taskPostFunc clean up function after single flush task done
type taskPostFunc func(pack *segmentFlushPack, postInjection postInjectionFunc)
......@@ -70,6 +78,7 @@ type postInjectionFunc func(pack *segmentFlushPack)
// make sure implementation
var _ flushManager = (*rendezvousFlushManager)(nil)
// orderFlushQueue keeps the order of task notifyFunc execution in order
type orderFlushQueue struct {
sync.Once
segmentID UniqueID
......@@ -111,8 +120,9 @@ func (q *orderFlushQueue) init() {
func (q *orderFlushQueue) getFlushTaskRunner(pos *internalpb.MsgPosition) *flushTaskRunner {
actual, loaded := q.working.LoadOrStore(string(pos.MsgID), newFlushTaskRunner(q.segmentID, q.injectCh))
t := actual.(*flushTaskRunner)
// not loaded means the task runner is new, do initializtion
if !loaded {
// take over injection if task queue is handling it
q.injectMut.Lock()
q.runningTasks++
if q.injectHandler != nil {
......@@ -120,7 +130,7 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *internalpb.MsgPosition) *flush
q.injectHandler = nil
}
q.injectMut.Unlock()
// add task to tail
q.tailMut.Lock()
t.init(q.notifyFunc, q.postTask, q.tailCh)
q.tailCh = t.finishSignal
......@@ -129,13 +139,18 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *internalpb.MsgPosition) *flush
return t
}
// postTask handles clean up work after a task is done
func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInjectionFunc) {
// delete task from working map
q.working.Delete(string(pack.pos.MsgID))
// after descreasing working count, check whether flush queue is empty
q.injectMut.Lock()
q.runningTasks--
// if flush queue is empty, let flush queue take over injection
if q.runningTasks == 0 {
q.injectHandler = newInjectHandler(q)
}
// set postInjection function if injection is handled in task
if postInjection != nil {
q.postInjection = postInjection
}
......@@ -163,12 +178,14 @@ func (q *orderFlushQueue) inject(inject *taskInjection) {
q.injectCh <- inject
}
// injectionHandler handles injection for empty flush queue
type injectHandler struct {
once sync.Once
wg sync.WaitGroup
done chan struct{}
}
// newInjectHandler create injection handler for flush queue
func newInjectHandler(q *orderFlushQueue) *injectHandler {
h := &injectHandler{
done: make(chan struct{}),
......@@ -208,6 +225,14 @@ func (h *injectHandler) close() {
})
}
type dropHandler struct {
sync.Mutex
dropFlushWg sync.WaitGroup
flushAndDrop flushAndDropFunc
allFlushed chan struct{}
packs []*segmentFlushPack
}
// rendezvousFlushManager makes sure insert & del buf all flushed
type rendezvousFlushManager struct {
allocatorInterface
......@@ -217,9 +242,12 @@ type rendezvousFlushManager struct {
// segment id => flush queue
dispatcher sync.Map
notifyFunc notifyMetaFunc
dropping atomic.Bool
dropHandler dropHandler
}
// getFlushQueue
// getFlushQueue gets or creates a orderFlushQueue for segment id if not found
func (m *rendezvousFlushManager) getFlushQueue(segmentID UniqueID) *orderFlushQueue {
newQueue := newOrderFlushQueue(segmentID, m.notifyFunc)
actual, _ := m.dispatcher.LoadOrStore(segmentID, newQueue)
......@@ -229,14 +257,64 @@ func (m *rendezvousFlushManager) getFlushQueue(segmentID UniqueID) *orderFlushQu
return queue
}
func (m *rendezvousFlushManager) handleInsertTask(segmentID UniqueID, task flushInsertTask, binlogs, statslogs map[UniqueID]string, flushed bool, dropped bool, pos *internalpb.MsgPosition) {
// in dropping mode
if m.dropping.Load() {
r := &flushTaskRunner{
WaitGroup: sync.WaitGroup{},
segmentID: segmentID,
}
r.WaitGroup.Add(1) // insert and delete are not bound in drop mode
r.runFlushInsert(task, binlogs, statslogs, flushed, dropped, pos)
r.WaitGroup.Wait()
m.dropHandler.Lock()
defer m.dropHandler.Unlock()
m.dropHandler.packs = append(m.dropHandler.packs, r.getFlushPack())
return
}
// normal mode
m.getFlushQueue(segmentID).enqueueInsertFlush(task, binlogs, statslogs, flushed, dropped, pos)
}
func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flushDeleteTask, deltaLogs *DelDataBuf, pos *internalpb.MsgPosition) {
// in dropping mode
if m.dropping.Load() {
// preventing separate delete, check position exists in queue first
q := m.getFlushQueue(segmentID)
_, ok := q.working.Load(string(pos.MsgID))
// if ok, means position insert data already in queue, just handle task in normal mode
// if not ok, means the insert buf should be handle in drop mode
if !ok {
r := &flushTaskRunner{
WaitGroup: sync.WaitGroup{},
segmentID: segmentID,
}
r.WaitGroup.Add(1) // insert and delete are not bound in drop mode
r.runFlushDel(task, deltaLogs)
r.WaitGroup.Wait()
m.dropHandler.Lock()
defer m.dropHandler.Unlock()
m.dropHandler.packs = append(m.dropHandler.packs, r.getFlushPack())
return
}
}
// normal mode
m.getFlushQueue(segmentID).enqueueDelFlush(task, deltaLogs, pos)
}
// notify flush manager insert buffer data
func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool,
dropped bool, pos *internalpb.MsgPosition) error {
// empty flush
if data == nil || data.buffer == nil {
m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{},
map[UniqueID]string{}, map[UniqueID]string{}, flushed, dropped, pos)
//m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{},
// map[UniqueID]string{}, map[UniqueID]string{}, flushed, dropped, pos)
m.handleInsertTask(segmentID, &flushBufferInsertTask{}, map[UniqueID]string{}, map[UniqueID]string{},
flushed, dropped, pos)
return nil
}
......@@ -301,7 +379,12 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
}
m.updateSegmentCheckPoint(segmentID)
m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{
/*
m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{
BaseKV: m.BaseKV,
data: kvs,
}, field2Insert, field2Stats, flushed, dropped, pos)*/
m.handleInsertTask(segmentID, &flushBufferInsertTask{
BaseKV: m.BaseKV,
data: kvs,
}, field2Insert, field2Stats, flushed, dropped, pos)
......@@ -314,7 +397,10 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique
// del signal with empty data
if data == nil || data.delData == nil {
m.getFlushQueue(segmentID).enqueueDelFlush(&flushBufferDeleteTask{}, nil, pos)
/*
m.getFlushQueue(segmentID).enqueueDelFlush(&flushBufferDeleteTask{}, nil, pos)
*/
m.handleDeleteTask(segmentID, &flushBufferDeleteTask{}, nil, pos)
return nil
}
......@@ -342,8 +428,12 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique
data.fileSize = int64(len(blob.Value))
data.filePath = blobPath
log.Debug("delete blob path", zap.String("path", blobPath))
m.getFlushQueue(segmentID).enqueueDelFlush(&flushBufferDeleteTask{
/*
m.getFlushQueue(segmentID).enqueueDelFlush(&flushBufferDeleteTask{
BaseKV: m.BaseKV,
data: kvs,
}, data, pos)*/
m.handleDeleteTask(segmentID, &flushBufferDeleteTask{
BaseKV: m.BaseKV,
data: kvs,
}, data, pos)
......@@ -381,6 +471,39 @@ func (m *rendezvousFlushManager) getSegmentMeta(segmentID UniqueID, pos *interna
return collID, partID, meta, nil
}
// waitForAllTaskQueue waits for all flush queues in dispatcher become empty
func (m *rendezvousFlushManager) waitForAllFlushQueue() {
var wg sync.WaitGroup
m.dispatcher.Range(func(k, v interface{}) bool {
queue := v.(*orderFlushQueue)
wg.Add(1)
go func() {
<-queue.tailCh
wg.Done()
}()
return true
})
wg.Wait()
}
// startDropping changes flush manager into dropping mode
func (m *rendezvousFlushManager) startDropping() {
m.dropping.Store(true)
m.dropHandler.allFlushed = make(chan struct{})
go func() {
<-m.dropHandler.allFlushed // all needed flush tasks are in flush manager now
m.waitForAllFlushQueue() // waits for all the normal flush queue done
m.dropHandler.dropFlushWg.Wait() // waits for all drop mode task done
m.dropHandler.Lock()
defer m.dropHandler.Unlock()
m.dropHandler.flushAndDrop(m.dropHandler.packs) // invoke drop & flush
}()
}
func (m *rendezvousFlushManager) notifyAllFlushed() {
close(m.dropHandler.allFlushed)
}
// close cleans up all the left members
func (m *rendezvousFlushManager) close() {
m.dispatcher.Range(func(k, v interface{}) bool {
......@@ -422,12 +545,122 @@ func (t *flushBufferDeleteTask) flushDeleteData() error {
}
// NewRendezvousFlushManager create rendezvousFlushManager with provided allocator and kv
func NewRendezvousFlushManager(allocator allocatorInterface, kv kv.BaseKV, replica Replica, f notifyMetaFunc) *rendezvousFlushManager {
return &rendezvousFlushManager{
func NewRendezvousFlushManager(allocator allocatorInterface, kv kv.BaseKV, replica Replica, f notifyMetaFunc, drop flushAndDropFunc) *rendezvousFlushManager {
fm := &rendezvousFlushManager{
allocatorInterface: allocator,
BaseKV: kv,
notifyFunc: f,
Replica: replica,
dropHandler: dropHandler{
flushAndDrop: drop,
},
}
// start with normal mode
fm.dropping.Store(false)
return fm
}
func getFieldBinlogs(fieldID UniqueID, binlogs []*datapb.FieldBinlog) *datapb.FieldBinlog {
for _, binlog := range binlogs {
if fieldID == binlog.GetFieldID() {
return binlog
}
}
return nil
}
func dropVirtualChannelFunc(dsService *dataSyncService, opts ...retry.Option) flushAndDropFunc {
return func(packs []*segmentFlushPack) {
req := &datapb.DropVirtualChannelRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO msg type
MsgID: 0, //TODO msg id
Timestamp: 0, //TODO time stamp
SourceID: Params.NodeID,
},
ChannelName: dsService.vchannelName,
}
segmentPack := make(map[UniqueID]*datapb.DropVirtualChannelSegment)
for _, pack := range packs {
segment, has := segmentPack[pack.segmentID]
if !has {
segment = &datapb.DropVirtualChannelSegment{
SegmentID: pack.segmentID,
CollectionID: dsService.collectionID,
}
segmentPack[pack.segmentID] = segment
}
for k, v := range pack.insertLogs {
fieldBinlogs := getFieldBinlogs(k, segment.Field2BinlogPaths)
if fieldBinlogs == nil {
segment.Field2BinlogPaths = append(segment.Field2BinlogPaths, &datapb.FieldBinlog{
FieldID: k,
Binlogs: []string{v},
})
} else {
fieldBinlogs.Binlogs = append(fieldBinlogs.Binlogs, v)
}
}
for k, v := range pack.statsLogs {
fieldStatsLogs := getFieldBinlogs(k, segment.Field2StatslogPaths)
if fieldStatsLogs == nil {
segment.Field2StatslogPaths = append(segment.Field2StatslogPaths, &datapb.FieldBinlog{
FieldID: k,
Binlogs: []string{v},
})
} else {
fieldStatsLogs.Binlogs = append(fieldStatsLogs.Binlogs, v)
}
}
for _, delData := range pack.deltaLogs {
segment.Deltalogs = append(segment.Deltalogs, &datapb.DeltaLogInfo{RecordEntries: uint64(delData.size), TimestampFrom: delData.tsFrom, TimestampTo: delData.tsTo, DeltaLogPath: delData.filePath, DeltaLogSize: delData.fileSize})
}
updates, _ := dsService.replica.getSegmentStatisticsUpdates(pack.segmentID)
segment.NumOfRows = updates.GetNumRows()
if pack.pos != nil {
if segment.CheckPoint == nil || pack.pos.Timestamp > segment.CheckPoint.Timestamp {
segment.CheckPoint = pack.pos
}
}
}
// start positions for all new segments
for _, pos := range dsService.replica.listNewSegmentsStartPositions() {
segment, has := segmentPack[pos.GetSegmentID()]
if !has {
segment = &datapb.DropVirtualChannelSegment{
SegmentID: pos.GetSegmentID(),
CollectionID: dsService.collectionID,
}
segmentPack[pos.GetSegmentID()] = segment
}
segment.StartPosition = pos.GetStartPosition()
}
err := retry.Do(context.Background(), func() error {
rsp, err := dsService.dataCoord.DropVirtualChannel(context.Background(), req)
// should be network issue, return error and retry
if err != nil {
return fmt.Errorf(err.Error())
}
// TODO should retry only when datacoord status is unhealthy
if rsp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return fmt.Errorf("data service DropVirtualChannel failed, reason = %s", rsp.GetStatus().GetReason())
}
return nil
}, opts...)
if err != nil {
log.Warn("failed to DropVirtualChannel", zap.String("channel", dsService.vchannelName), zap.Error(err))
panic(err)
}
for segID := range segmentPack {
dsService.replica.segmentFlushed(segID)
dsService.flushingSegCache.Remove(segID)
}
}
}
......
......@@ -22,6 +22,7 @@ import (
"errors"
"sync"
"testing"
"time"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/internalpb"
......@@ -144,7 +145,7 @@ func TestRendezvousFlushManager(t *testing.T) {
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
counter.Inc()
finish.Done()
})
}, emptyFlushAndDropFunc)
ids := make([][]byte, 0, size)
for i := 0; i < size; i++ {
......@@ -185,7 +186,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
packMut.Unlock()
counter.Inc()
finish.Done()
})
}, emptyFlushAndDropFunc)
ti := newTaskInjection(1, func(*segmentFlushPack) {})
m.injectFlush(ti, 1)
......@@ -276,7 +277,7 @@ func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) {
memkv := memkv.NewMemoryKV()
replica := newMockReplica()
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, replica, func(*segmentFlushPack) {
})
}, emptyFlushAndDropFunc)
// non exists segment
_, _, _, err := fm.getSegmentMeta(-1, &internalpb.MsgPosition{})
......@@ -293,6 +294,128 @@ func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) {
assert.Error(t, err)
}
func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
kv := memkv.NewMemoryKV()
size := 1000
var counter atomic.Int64
var finish sync.WaitGroup
finish.Add(size)
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
counter.Inc()
finish.Done()
}, emptyFlushAndDropFunc)
ids := make([][]byte, 0, size)
for i := 0; i < size; i++ {
id := make([]byte, 10)
rand.Read(id)
ids = append(ids, id)
}
for i := 0; i < size; i++ {
m.flushDelData(nil, 1, &internalpb.MsgPosition{
MsgID: ids[i],
})
}
var finished bool
var mut sync.RWMutex
signal := make(chan struct{})
go func() {
m.waitForAllFlushQueue()
mut.Lock()
finished = true
mut.Unlock()
close(signal)
}()
mut.RLock()
assert.False(t, finished)
mut.RUnlock()
for i := 0; i < size/2; i++ {
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
}
mut.RLock()
assert.False(t, finished)
mut.RUnlock()
for i := size / 2; i < size; i++ {
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
}
timeout := time.NewTimer(time.Second)
select {
case <-timeout.C:
t.FailNow()
case <-signal:
}
mut.RLock()
assert.True(t, finished)
mut.RUnlock()
}
func TestRendezvousFlushManager_dropMode(t *testing.T) {
kv := memkv.NewMemoryKV()
var mut sync.Mutex
var result []*segmentFlushPack
signal := make(chan struct{})
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
}, func(packs []*segmentFlushPack) {
mut.Lock()
result = packs
mut.Unlock()
close(signal)
})
halfMsgID := []byte{1, 1, 1}
m.flushBufferData(nil, -1, true, false, &internalpb.MsgPosition{
MsgID: halfMsgID,
})
m.startDropping()
// half normal, half drop mode, should not appear in final packs
m.flushDelData(nil, -1, &internalpb.MsgPosition{
MsgID: halfMsgID,
})
target := make(map[int64]struct{})
for i := 1; i < 11; i++ {
target[int64(i)] = struct{}{}
m.flushBufferData(nil, int64(i), true, false, &internalpb.MsgPosition{
MsgID: []byte{1},
})
m.flushDelData(nil, int64(i), &internalpb.MsgPosition{
MsgID: []byte{1},
})
}
m.notifyAllFlushed()
<-signal
mut.Lock()
defer mut.Unlock()
output := make(map[int64]struct{})
for _, pack := range result {
assert.NotEqual(t, -1, pack.segmentID)
output[pack.segmentID] = struct{}{}
_, has := target[pack.segmentID]
assert.True(t, has)
}
assert.Equal(t, len(target), len(output))
}
func TestRendezvousFlushManager_close(t *testing.T) {
kv := memkv.NewMemoryKV()
......@@ -303,7 +426,7 @@ func TestRendezvousFlushManager_close(t *testing.T) {
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
counter.Inc()
finish.Done()
})
}, emptyFlushAndDropFunc)
ids := make([][]byte, 0, size)
for i := 0; i < size; i++ {
......@@ -331,8 +454,6 @@ func TestRendezvousFlushManager_close(t *testing.T) {
}
func TestFlushNotifyFunc(t *testing.T) {
// replica :=
// rcf := &RootCoordFactory{}
ctx := context.Background()
rcf := &RootCoordFactory{}
......@@ -382,3 +503,70 @@ func TestFlushNotifyFunc(t *testing.T) {
})
})
}
func TestDropVirtualChannelFunc(t *testing.T) {
ctx := context.Background()
rcf := &RootCoordFactory{}
replica, err := newReplica(ctx, rcf, 1)
require.NoError(t, err)
dataCoord := &DataCoordFactory{}
flushingCache := newCache()
dsService := &dataSyncService{
collectionID: 1,
replica: replica,
dataCoord: dataCoord,
flushingSegCache: flushingCache,
vchannelName: "vchan_01",
}
dropFunc := dropVirtualChannelFunc(dsService, retry.Attempts(1))
t.Run("normal run", func(t *testing.T) {
replica.addNewSegment(2, 1, 10, "vchan_01", &internalpb.MsgPosition{
ChannelName: "vchan_01",
MsgID: []byte{1, 2, 3},
Timestamp: 10,
}, nil)
assert.NotPanics(t, func() {
dropFunc([]*segmentFlushPack{
{
segmentID: 1,
insertLogs: map[UniqueID]string{1: "/dev/test/id"},
statsLogs: map[UniqueID]string{1: "/dev/test/id-stats"},
deltaLogs: []*DelDataBuf{{filePath: "/dev/test/del"}},
pos: &internalpb.MsgPosition{
ChannelName: "vchan_01",
MsgID: []byte{1, 2, 3},
Timestamp: 10,
},
},
{
segmentID: 1,
insertLogs: map[UniqueID]string{1: "/dev/test/id_2"},
statsLogs: map[UniqueID]string{1: "/dev/test/id-stats-2"},
deltaLogs: []*DelDataBuf{{filePath: "/dev/test/del-2"}},
pos: &internalpb.MsgPosition{
ChannelName: "vchan_01",
MsgID: []byte{1, 2, 3},
Timestamp: 30,
},
},
})
})
})
t.Run("datacoord drop fails", func(t *testing.T) {
dataCoord.DropVirtualChannelNotSuccess = true
assert.Panics(t, func() {
dropFunc(nil)
})
})
t.Run("datacoord call error", func(t *testing.T) {
dataCoord.DropVirtualChannelNotSuccess = false
dataCoord.DropVirtualChannelError = true
assert.Panics(t, func() {
dropFunc(nil)
})
})
}
......@@ -49,6 +49,8 @@ import (
const ctxTimeInMillisecond = 5000
const debug = false
var emptyFlushAndDropFunc flushAndDropFunc = func(_ []*segmentFlushPack) {}
func newIDLEDataNodeMock(ctx context.Context) *DataNode {
msFactory := msgstream.NewPmsFactory()
node := NewDataNode(ctx, msFactory)
......@@ -160,6 +162,9 @@ type DataCoordFactory struct {
CompleteCompactionError bool
CompleteCompactionNotSuccess bool
DropVirtualChannelError bool
DropVirtualChannelNotSuccess bool
}
func (ds *DataCoordFactory) CompleteCompaction(ctx context.Context, req *datapb.CompactionResult) (*commonpb.Status, error) {
......@@ -184,6 +189,24 @@ func (ds *DataCoordFactory) SaveBinlogPaths(ctx context.Context, req *datapb.Sav
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}
func (ds *DataCoordFactory) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
if ds.DropVirtualChannelError {
return nil, errors.New("error")
}
if ds.DropVirtualChannelNotSuccess {
return &datapb.DropVirtualChannelResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, nil
}
return &datapb.DropVirtualChannelResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
func (mf *MetaFactory) GetCollectionMeta(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta {
sch := schemapb.CollectionSchema{
Name: collectionName,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册