未验证 提交 c4bb73dc 编写于 作者: X XuanYang-cn 提交者: GitHub

Complete drop collection procedure in DN (#11552)

drop, cancel compaction, flush all buffers and release fg

See also: #11426
Signed-off-by: Nyangxuan <xuan.yang@zilliz.com>
上级 daaeb27e
......@@ -43,14 +43,14 @@ func (c *Cache) checkIfCached(key UniqueID) bool {
return ok
}
// Cache caches a specific segment ID into the cache
func (c *Cache) Cache(segID UniqueID) {
c.cacheMap.Store(segID, struct{}{})
// Cache caches a specific ID into the cache
func (c *Cache) Cache(ID UniqueID) {
c.cacheMap.Store(ID, struct{}{})
}
// Remove removes a set of segment IDs from the cache
func (c *Cache) Remove(segIDs ...UniqueID) {
for _, id := range segIDs {
// Remove removes a set of IDs from the cache
func (c *Cache) Remove(IDs ...UniqueID) {
for _, id := range IDs {
c.cacheMap.Delete(id)
}
}
......@@ -19,6 +19,7 @@ package datanode
import (
"context"
"runtime"
"sync"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
......@@ -32,6 +33,7 @@ var maxParallelCompactionNum = calculeateParallel()
type compactionExecutor struct {
parallelCh chan struct{}
executing sync.Map // planID to compactor
taskCh chan compactor
}
......@@ -47,6 +49,7 @@ func calculeateParallel() int {
func newCompactionExecutor() *compactionExecutor {
return &compactionExecutor{
parallelCh: make(chan struct{}, maxParallelCompactionNum),
executing: sync.Map{},
taskCh: make(chan compactor, maxTaskNum),
}
}
......@@ -72,6 +75,7 @@ func (c *compactionExecutor) executeTask(task compactor) {
<-c.parallelCh
}()
c.executing.Store(task.getPlanID(), task)
log.Info("start to execute compaction", zap.Int64("planID", task.getPlanID()))
err := task.compact()
......@@ -82,5 +86,24 @@ func (c *compactionExecutor) executeTask(task compactor) {
)
}
c.executing.Delete(task.getPlanID())
log.Info("end to execute compaction", zap.Int64("planID", task.getPlanID()))
}
func (c *compactionExecutor) stopTask(planID UniqueID) {
task, loaded := c.executing.LoadAndDelete(planID)
if loaded {
log.Warn("compaction executor stop task", zap.Int64("planID", planID))
task.(compactor).stop()
}
}
func (c *compactionExecutor) stopExecutingtaskByCollectionID(collID UniqueID) {
c.executing.Range(func(key interface{}, value interface{}) bool {
if value.(compactor).getCollection() == collID {
c.stopTask(key.(UniqueID))
}
return true
})
}
......@@ -28,6 +28,13 @@ func TestCompactionExecutor(t *testing.T) {
ex.execute(newMockCompactor(true))
})
t.Run("Test stopTask", func(t *testing.T) {
ex := newCompactionExecutor()
mc := newMockCompactor(true)
ex.executing.Store(UniqueID(1), mc)
ex.stopTask(UniqueID(1))
})
t.Run("Test start", func(t *testing.T) {
ex := newCompactionExecutor()
ctx, cancel := context.WithCancel(context.TODO())
......@@ -59,16 +66,20 @@ func TestCompactionExecutor(t *testing.T) {
}
func newMockCompactor(isvalid bool) compactor {
return &mockCompactor{isvalid}
func newMockCompactor(isvalid bool) *mockCompactor {
return &mockCompactor{isvalid: isvalid}
}
type mockCompactor struct {
ctx context.Context
cancel context.CancelFunc
isvalid bool
}
var _ compactor = (*mockCompactor)(nil)
func (mc *mockCompactor) compact() error {
if mc.isvalid {
if !mc.isvalid {
return errStart
}
return nil
......@@ -77,3 +88,13 @@ func (mc *mockCompactor) compact() error {
func (mc *mockCompactor) getPlanID() UniqueID {
return 1
}
func (mc *mockCompactor) stop() {
if mc.cancel != nil {
mc.cancel()
}
}
func (mc *mockCompactor) getCollection() UniqueID {
return 1
}
......@@ -49,7 +49,9 @@ type iterator = storage.Iterator
type compactor interface {
compact() error
stop()
getPlanID() UniqueID
getCollection() UniqueID
}
// make sure compactionTask implements compactor interface
......@@ -65,12 +67,16 @@ type compactionTask struct {
dc types.DataCoord
plan *datapb.CompactionPlan
ctx context.Context
cancel context.CancelFunc
}
// check if compactionTask implements compactor
var _ compactor = (*compactionTask)(nil)
func newCompactionTask(
ctx context.Context,
dl downloader,
ul uploader,
replica Replica,
......@@ -78,7 +84,12 @@ func newCompactionTask(
alloc allocatorInterface,
dc types.DataCoord,
plan *datapb.CompactionPlan) *compactionTask {
ctx1, cancel := context.WithCancel(ctx)
return &compactionTask{
ctx: ctx1,
cancel: cancel,
downloader: dl,
uploader: ul,
Replica: replica,
......@@ -89,6 +100,10 @@ func newCompactionTask(
}
}
func (t *compactionTask) stop() {
t.cancel()
}
func (t *compactionTask) getPlanID() UniqueID {
return t.plan.GetPlanID()
}
......@@ -238,7 +253,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp,
}
func (t *compactionTask) compact() error {
ctxTimeout, cancelAll := context.WithTimeout(context.Background(), time.Duration(t.plan.GetTimeoutInSeconds())*time.Second)
ctxTimeout, cancelAll := context.WithTimeout(t.ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second)
defer cancelAll()
var targetSegID UniqueID
......@@ -590,3 +605,7 @@ func (t *compactionTask) getSegmentMeta(segID UniqueID) (UniqueID, UniqueID, *et
}
return collID, partID, meta, nil
}
func (t *compactionTask) getCollection() UniqueID {
return t.getCollectionID()
}
......@@ -55,6 +55,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
assert.NotNil(t, meta)
rc.setCollectionID(-2)
task.Replica.(*SegmentReplica).collSchema = nil
_, _, _, err = task.getSegmentMeta(100)
assert.Error(t, err)
})
......@@ -288,7 +289,11 @@ func TestCompactorInterfaceMethods(t *testing.T) {
t.Run("Test compact invalid", func(t *testing.T) {
invalidAlloc := NewAllocatorFactory(-1)
emptyTask := &compactionTask{}
ctx, cancel := context.WithCancel(context.TODO())
emptyTask := &compactionTask{
ctx: ctx,
cancel: cancel,
}
emptySegmentBinlogs := []*datapb.CompactionSegmentBinlogs{}
plan := &datapb.CompactionPlan{
......@@ -314,6 +319,8 @@ func TestCompactorInterfaceMethods(t *testing.T) {
plan.SegmentBinlogs = notEmptySegmentBinlogs
err = emptyTask.compact()
assert.Error(t, err)
emptyTask.stop()
})
t.Run("Test typeI compact valid", func(t *testing.T) {
......@@ -358,7 +365,13 @@ func TestCompactorInterfaceMethods(t *testing.T) {
Channel: "channelname",
}
task := newCompactionTask(mockbIO, mockbIO, replica, mockfm, alloc, dc, plan)
ctx, cancel := context.WithCancel(context.TODO())
cancel()
canceledTask := newCompactionTask(ctx, mockbIO, mockbIO, replica, mockfm, alloc, dc, plan)
err = canceledTask.compact()
assert.Error(t, err)
task := newCompactionTask(context.TODO(), mockbIO, mockbIO, replica, mockfm, alloc, dc, plan)
err = task.compact()
assert.NoError(t, err)
......@@ -366,6 +379,12 @@ func TestCompactorInterfaceMethods(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, int64(1), updates.GetNumRows())
id := task.getCollection()
assert.Equal(t, UniqueID(1), id)
planID := task.getPlanID()
assert.Equal(t, plan.GetPlanID(), planID)
// New test, remove all the binlogs in memkv
// Deltas in timetravel range
err = mockKv.RemoveWithPrefix("/")
......@@ -458,7 +477,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
}
alloc.random = false // generated ID = 19530
task := newCompactionTask(mockbIO, mockbIO, replica, mockfm, alloc, dc, plan)
task := newCompactionTask(context.TODO(), mockbIO, mockbIO, replica, mockfm, alloc, dc, plan)
err = task.compact()
assert.NoError(t, err)
......@@ -525,7 +544,7 @@ type mockFlushManager struct {
var _ flushManager = (*mockFlushManager)(nil)
func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, pos *internalpb.MsgPosition) error {
func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error {
return nil
}
......
......@@ -98,6 +98,7 @@ type DataNode struct {
Role string
State atomic.Value // internalpb.StateCode_Initializing
// TODO struct
chanMut sync.RWMutex
vchan2SyncService map[string]*dataSyncService // vchannel name
vchan2FlushChs map[string]chan flushMsg // vchannel name to flush channels
......@@ -118,6 +119,11 @@ type DataNode struct {
msFactory msgstream.Factory
}
type plan struct {
channelName string
cancel context.CancelFunc
}
// NewDataNode will return a DataNode with abnormal state.
func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
rand.Seed(time.Now().UnixNano())
......@@ -327,7 +333,7 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
flushCh := make(chan flushMsg, 100)
dataSyncService, err := newDataSyncService(node.ctx, flushCh, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache)
dataSyncService, err := newDataSyncService(node.ctx, flushCh, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache, node.blobKv)
if err != nil {
return err
}
......@@ -351,6 +357,7 @@ func (node *DataNode) BackGroundGC(collIDCh <-chan UniqueID) {
select {
case collID := <-collIDCh:
log.Info("GC collection", zap.Int64("ID", collID))
node.stopCompactionOfCollection(collID)
for _, vchanName := range node.getChannelNamesbyCollectionID(collID) {
node.ReleaseDataSyncService(vchanName)
}
......@@ -725,6 +732,12 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
}, nil
}
func (node *DataNode) stopCompactionOfCollection(collID UniqueID) {
log.Debug("Stop compaction of collection", zap.Int64("collection ID", collID))
node.compactionExecutor.stopExecutingtaskByCollectionID(collID)
}
func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
......@@ -739,6 +752,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
binlogIO := &binlogIO{node.blobKv, ds.idAllocator}
task := newCompactionTask(
ctx,
binlogIO, binlogIO,
ds.replica,
ds.flushManager,
......
......@@ -21,7 +21,7 @@ import (
"errors"
"fmt"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
......@@ -47,6 +47,7 @@ type dataSyncService struct {
flushingSegCache *Cache // a guarding cache stores currently flushing segment ids
flushManager flushManager // flush manager handles flush process
blobKV kv.BaseKV
}
func newDataSyncService(ctx context.Context,
......@@ -58,6 +59,7 @@ func newDataSyncService(ctx context.Context,
clearSignal chan<- UniqueID,
dataCoord types.DataCoord,
flushingSegCache *Cache,
blobKV kv.BaseKV,
) (*dataSyncService, error) {
......@@ -79,6 +81,7 @@ func newDataSyncService(ctx context.Context,
dataCoord: dataCoord,
clearSignal: clearSignal,
flushingSegCache: flushingSegCache,
blobKV: blobKV,
}
if err := service.initNodes(vchan); err != nil {
......@@ -141,23 +144,8 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
return err
}
// MinIO
option := &miniokv.Option{
Address: Params.MinioAddress,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSL,
CreateBucket: true,
BucketName: Params.MinioBucketName,
}
minIOKV, err := miniokv.NewMinIOKV(dsService.ctx, option)
if err != nil {
return err
}
// initialize flush manager for DataSync Service
dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, minIOKV, dsService.replica, func(pack *segmentFlushPack) error {
dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.blobKV, dsService.replica, func(pack *segmentFlushPack) error {
fieldInsert := []*datapb.FieldBinlog{}
fieldStats := []*datapb.FieldBinlog{}
deltaInfos := []*datapb.DeltaLogInfo{}
......@@ -205,8 +193,9 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
StartPositions: dsService.replica.listNewSegmentsStartPositions(),
Flushed: pack.flushed,
Dropped: pack.dropped,
}
rsp, err := dsService.dataCoord.SaveBinlogPaths(dsService.ctx, req)
rsp, err := dsService.dataCoord.SaveBinlogPaths(context.Background(), req)
if err != nil {
return fmt.Errorf(err.Error())
}
......@@ -280,7 +269,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
return err
}
var ddNode Node = newDDNode(dsService.ctx, dsService.clearSignal, dsService.collectionID, vchanInfo, dsService.msFactory)
var ddNode Node = newDDNode(dsService.ctx, dsService.collectionID, vchanInfo, dsService.msFactory)
var insertBufferNode Node
insertBufferNode, err = newInsertBufferNode(
dsService.ctx,
......@@ -294,7 +283,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
}
var deleteNode Node
deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushManager, c)
deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushManager, dsService.clearSignal, c)
if err != nil {
return err
}
......
......@@ -28,6 +28,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
......@@ -146,6 +147,7 @@ func TestDataSyncService_newDataSyncService(te *testing.T) {
make(chan UniqueID),
df,
newCache(),
memkv.NewMemoryKV(),
)
if !test.isValidCase {
......@@ -222,7 +224,7 @@ func TestDataSyncService_Start(t *testing.T) {
}
signalCh := make(chan UniqueID, 100)
sync, err := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataCoordFactory{}, newCache())
sync, err := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataCoordFactory{}, newCache(), memkv.NewMemoryKV())
assert.Nil(t, err)
// sync.replica.addCollection(collMeta.ID, collMeta.Schema)
......
......@@ -19,6 +19,7 @@ package datanode
import (
"context"
"sync"
"sync/atomic"
"go.uber.org/zap"
......@@ -53,13 +54,13 @@ var _ flowgraph.Node = (*ddNode)(nil)
type ddNode struct {
BaseNode
clearSignal chan<- UniqueID
collectionID UniqueID
segID2SegInfo sync.Map // segment ID to *SegmentInfo
flushedSegments []*datapb.SegmentInfo
deltaMsgStream msgstream.MsgStream
dropMode atomic.Value
}
// Name returns node name, implementing flowgraph.Node
......@@ -89,6 +90,11 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
msg.SetTraceCtx(ctx)
}
if load := ddn.dropMode.Load(); load != nil && load.(bool) {
log.Debug("ddNode in dropMode")
return []Msg{}
}
var fgMsg = flowGraphMsg{
insertMessages: make([]*msgstream.InsertMsg, 0),
timeRange: TimeRange{
......@@ -97,6 +103,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
},
startPositions: make([]*internalpb.MsgPosition, 0),
endPositions: make([]*internalpb.MsgPosition, 0),
dropCollection: false,
}
forwardMsgs := make([]msgstream.TsMsg, 0)
......@@ -104,9 +111,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
switch msg.Type() {
case commonpb.MsgType_DropCollection:
if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID {
log.Info("Destroying current flowgraph", zap.Any("collectionID", ddn.collectionID))
ddn.clearSignal <- ddn.collectionID
return []Msg{}
log.Info("Receiving DropCollection msg", zap.Any("collectionID", ddn.collectionID))
ddn.dropMode.Store(true)
fgMsg.dropCollection = true
}
case commonpb.MsgType_Insert:
log.Debug("DDNode receive insert messages")
......@@ -233,7 +240,7 @@ func (ddn *ddNode) Close() {
}
}
func newDDNode(ctx context.Context, clearSignal chan<- UniqueID, collID UniqueID, vchanInfo *datapb.VchannelInfo, msFactory msgstream.Factory) *ddNode {
func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelInfo, msFactory msgstream.Factory) *ddNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(Params.FlowGraphMaxQueueLength)
baseNode.SetMaxParallelism(Params.FlowGraphMaxParallelism)
......@@ -247,6 +254,7 @@ func newDDNode(ctx context.Context, clearSignal chan<- UniqueID, collID UniqueID
deltaStream, err := msFactory.NewMsgStream(ctx)
if err != nil {
log.Error(err.Error())
return nil
}
pChannelName := rootcoord.ToPhysicalChannel(vchanInfo.ChannelName)
......@@ -255,6 +263,7 @@ func newDDNode(ctx context.Context, clearSignal chan<- UniqueID, collID UniqueID
log.Error(err.Error())
return nil
}
deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc)
deltaStream.AsProducer([]string{deltaChannelName})
log.Debug("datanode AsProducer", zap.String("DeltaChannelName", deltaChannelName))
......@@ -263,12 +272,13 @@ func newDDNode(ctx context.Context, clearSignal chan<- UniqueID, collID UniqueID
dd := &ddNode{
BaseNode: baseNode,
clearSignal: clearSignal,
collectionID: collID,
flushedSegments: fs,
deltaMsgStream: deltaMsgStream,
}
dd.dropMode.Store(false)
for _, us := range vchanInfo.GetUnflushedSegments() {
dd.segID2SegInfo.Store(us.GetID(), us)
}
......
......@@ -21,6 +21,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
......@@ -63,25 +64,28 @@ func TestFlowGraph_DDNode_newDDNode(te *testing.T) {
di.DmlPosition = &internalpb.MsgPosition{Timestamp: test.inUnFlushedChannelTs}
}
fi := []*datapb.SegmentInfo{}
var fi []*datapb.SegmentInfo
for _, id := range test.inFlushedSegs {
s := &datapb.SegmentInfo{ID: id}
fi = append(fi, s)
}
mmf := &mockMsgStreamFactory{
true, true,
}
ddNode := newDDNode(
context.Background(),
make(chan UniqueID),
test.inCollID,
&datapb.VchannelInfo{
FlushedSegments: fi,
UnflushedSegments: []*datapb.SegmentInfo{di},
ChannelName: "by-dev-rootcoord-dml-test",
},
msgstream.NewPmsFactory(),
mmf,
)
require.NotNil(t, ddNode)
flushedSegIDs := make([]int64, 0)
var flushedSegIDs []UniqueID
for _, seg := range ddNode.flushedSegments {
flushedSegIDs = append(flushedSegIDs, seg.ID)
}
......@@ -123,17 +127,16 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) {
// valid inputs
tests := []struct {
ddnClearSignal chan UniqueID
ddnCollID UniqueID
ddnCollID UniqueID
msgCollID UniqueID
expectedChlen int
description string
}{
{make(chan UniqueID, 1), 1, 1, 1,
{1, 1, 1,
"DropCollectionMsg collID == ddNode collID"},
{make(chan UniqueID, 1), 1, 2, 0,
{1, 2, 0,
"DropCollectionMsg collID != ddNode collID"},
}
......@@ -143,7 +146,6 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) {
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
ddn := ddNode{
clearSignal: test.ddnClearSignal,
collectionID: test.ddnCollID,
deltaMsgStream: deltaStream,
}
......@@ -158,10 +160,10 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) {
var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil)
rt := ddn.Operate([]Msg{msgStreamMsg})
assert.Equal(t, test.expectedChlen, len(test.ddnClearSignal))
if test.ddnCollID == test.msgCollID {
assert.Empty(t, rt)
assert.NotEmpty(t, rt)
assert.True(t, rt[0].(*flowGraphMsg).dropCollection)
} else {
assert.NotEmpty(t, rt)
}
......
......@@ -44,6 +44,8 @@ type deleteNode struct {
replica Replica
idAllocator allocatorInterface
flushManager flushManager
clearSignal chan<- UniqueID
}
// DelDataBuf buffers insert data, monitoring buffer size and limit
......@@ -206,10 +208,14 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
dn.delBuf.Delete(segmentToFlush)
}
}
}
}
if fgMsg.dropCollection {
log.Debug("DeleteNode reveives dropCollection signal")
dn.clearSignal <- dn.replica.getCollectionID()
}
for _, sp := range spans {
sp.Finish()
}
......@@ -235,7 +241,7 @@ func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []int64) map[int64]
return result
}
func newDeleteNode(ctx context.Context, fm flushManager, config *nodeConfig) (*deleteNode, error) {
func newDeleteNode(ctx context.Context, fm flushManager, sig chan<- UniqueID, config *nodeConfig) (*deleteNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(config.maxQueueLength)
baseNode.SetMaxParallelism(config.maxParallelism)
......@@ -248,5 +254,6 @@ func newDeleteNode(ctx context.Context, fm flushManager, config *nodeConfig) (*d
idAllocator: config.allocator,
channelName: config.vChannelName,
flushManager: fm,
clearSignal: sig,
}, nil
}
......@@ -107,7 +107,7 @@ func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) {
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
dn, err := newDeleteNode(test.ctx, nil, test.config)
dn, err := newDeleteNode(test.ctx, nil, make(chan UniqueID, 1), test.config)
assert.Nil(t, err)
assert.NotNil(t, dn)
......@@ -215,7 +215,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
vChannelName: chanName,
}
dn, err := newDeleteNode(context.Background(), fm, c)
dn, err := newDeleteNode(context.Background(), fm, make(chan UniqueID, 1), c)
assert.Nil(t, err)
results := dn.filterSegmentByPK(0, pks)
......@@ -246,7 +246,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, fm, c)
delNode, err := newDeleteNode(ctx, fm, make(chan UniqueID, 1), c)
assert.Nil(te, err)
msg := genFlowGraphDeleteMsg(pks, chanName)
......@@ -270,7 +270,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, fm, c)
delNode, err := newDeleteNode(ctx, fm, make(chan UniqueID, 1), c)
assert.Nil(te, err)
msg := genFlowGraphDeleteMsg(pks, chanName)
......
......@@ -21,7 +21,6 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"strconv"
"sync"
......@@ -40,7 +39,6 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
......@@ -247,14 +245,45 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
zap.Int64("buffer limit", bd.(*BufferData).limit))
}
segmentsToFlush := make([]UniqueID, 0, len(seg2Upload)+1) //auto flush number + possible manual flush
// Flush
type flushTask struct {
buffer *BufferData
segmentID UniqueID
flushed bool
dropped bool
}
var (
flushTaskList []flushTask
segmentsToFlush []UniqueID
)
if fgMsg.dropCollection {
segmentsToFlush := ibNode.replica.listAllSegmentIDs()
log.Debug("Recive drop collection req and flushing all segments",
zap.Any("segments", segmentsToFlush))
flushTaskList = make([]flushTask, 0, len(segmentsToFlush))
for _, seg2Flush := range segmentsToFlush {
var buf *BufferData
bd, ok := ibNode.insertBuffer.Load(seg2Flush)
if !ok {
buf = nil
} else {
buf = bd.(*BufferData)
}
flushTaskList = append(flushTaskList, flushTask{
buffer: buf,
segmentID: seg2Flush,
flushed: false,
dropped: true,
})
}
goto flush // Jump over the auto-flush and manual flush procedure
}
flushTaskList := make([]flushTask, 0, len(seg2Upload)+1)
segmentsToFlush = make([]UniqueID, 0, len(seg2Upload)+1) //auto flush number + possible manual flush
flushTaskList = make([]flushTask, 0, len(seg2Upload)+1)
// Auto Flush
for _, segToFlush := range seg2Upload {
......@@ -267,6 +296,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
buffer: ibuffer,
segmentID: segToFlush,
flushed: false,
dropped: false,
})
}
}
......@@ -274,6 +304,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
// Manual Flush
select {
case fmsg := <-ibNode.flushChan:
log.Debug(". Receiving flush message",
zap.Int64("segmentID", fmsg.segmentID),
zap.Int64("collectionID", fmsg.collectionID),
......@@ -299,13 +330,15 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
buffer: buf,
segmentID: currentSegID,
flushed: fmsg.flushed,
dropped: false,
})
}
default:
}
flush:
for _, task := range flushTaskList {
err := ibNode.flushManager.flushBufferData(task.buffer, task.segmentID, task.flushed, endPositions[0])
err := ibNode.flushManager.flushBufferData(task.buffer, task.segmentID, task.flushed, task.dropped, endPositions[0])
if err != nil {
log.Warn("failed to invoke flushBufferData", zap.Error(err))
} else {
......@@ -327,6 +360,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
startPositions: fgMsg.startPositions,
endPositions: fgMsg.endPositions,
segmentsToFlush: segmentsToFlush,
dropCollection: fgMsg.dropCollection,
}
for _, sp := range spans {
......@@ -719,24 +753,6 @@ func (ibNode *insertBufferNode) uploadMemStates2Coord(segIDs []UniqueID) error {
return ibNode.segmentStatisticsStream.Produce(&msgPack)
}
func (ibNode *insertBufferNode) getCollMetabySegID(segmentID UniqueID, ts Timestamp) (meta *etcdpb.CollectionMeta, err error) {
if !ibNode.replica.hasSegment(segmentID, true) {
return nil, fmt.Errorf("No such segment %d in the replica", segmentID)
}
collID := ibNode.replica.getCollectionID()
sch, err := ibNode.replica.getCollectionSchema(collID, ts)
if err != nil {
return nil, err
}
meta = &etcdpb.CollectionMeta{
ID: collID,
Schema: sch,
}
return
}
func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID UniqueID) (collID, partitionID UniqueID, err error) {
return ibNode.replica.getCollectionAndPartitionID(segmentID)
}
......
......@@ -622,67 +622,6 @@ func (m *CompactedRootCoord) DescribeCollection(ctx context.Context, in *milvusp
return m.RootCoord.DescribeCollection(ctx, in)
}
func TestInsertBufferNode_getCollMetaBySegID(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
insertChannelName := "datanode-01-test-flowgraphinsertbuffernode-operate"
testPath := "/test/datanode/root/meta"
err := clearEtcd(testPath)
require.NoError(t, err)
Params.MetaRootPath = testPath
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1")
rcf := &RootCoordFactory{}
mockRootCoord := &CompactedRootCoord{
RootCoord: rcf,
compactTs: 100,
}
replica, err := newReplica(ctx, mockRootCoord, collMeta.ID)
assert.Nil(t, err)
err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
require.NoError(t, err)
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarAddress,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) error {
return nil
})
flushChan := make(chan flushMsg, 100)
c := &nodeConfig{
replica: replica,
msFactory: msFactory,
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
require.NoError(t, err)
meta, err := iBNode.getCollMetabySegID(1, 101)
assert.Nil(t, err)
assert.Equal(t, collMeta.ID, meta.ID)
_, err = iBNode.getCollMetabySegID(2, 101)
assert.NotNil(t, err)
meta, err = iBNode.getCollMetabySegID(1, 99)
assert.NotNil(t, err)
}
func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
......@@ -740,12 +679,6 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
assert.Nil(t, err)
}
for _, msg := range inMsg.insertMessages {
msg.EndTimestamp = 99 // ts invalid
err = iBNode.bufferInsertMsg(msg, &internalpb.MsgPosition{})
assert.NotNil(t, err)
}
for _, msg := range inMsg.insertMessages {
msg.EndTimestamp = 101 // ts valid
msg.RowIDs = []int64{} //misaligned data
......
......@@ -38,6 +38,7 @@ type flowGraphMsg struct {
endPositions []*internalpb.MsgPosition
//segmentsToFlush is the signal used by insertBufferNode to notify deleteNode to flush
segmentsToFlush []UniqueID
dropCollection bool
}
func (fgMsg *flowGraphMsg) TimeTick() Timestamp {
......
......@@ -33,7 +33,7 @@ import (
// flushManager defines a flush manager signature
type flushManager interface {
// notify flush manager insert buffer data
flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, pos *internalpb.MsgPosition) error
flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error
// notify flush manager del buffer data
flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error
// injectFlush injects compaction or other blocking task before flush sync
......@@ -48,6 +48,7 @@ type segmentFlushPack struct {
deltaLogs []*DelDataBuf
pos *internalpb.MsgPosition
flushed bool
dropped bool
}
// notifyMetaFunc notify meta to persistent flush result
......@@ -139,8 +140,8 @@ func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInj
}
// enqueueInsertBuffer put insert buffer data into queue
func (q *orderFlushQueue) enqueueInsertFlush(task flushInsertTask, binlogs, statslogs map[UniqueID]string, flushed bool, pos *internalpb.MsgPosition) {
q.getFlushTaskRunner(pos).runFlushInsert(task, binlogs, statslogs, flushed, pos)
func (q *orderFlushQueue) enqueueInsertFlush(task flushInsertTask, binlogs, statslogs map[UniqueID]string, flushed bool, dropped bool, pos *internalpb.MsgPosition) {
q.getFlushTaskRunner(pos).runFlushInsert(task, binlogs, statslogs, flushed, dropped, pos)
}
// enqueueDelBuffer put delete buffer data into queue
......@@ -219,12 +220,12 @@ func (m *rendezvousFlushManager) getFlushQueue(segmentID UniqueID) *orderFlushQu
// notify flush manager insert buffer data
func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool,
pos *internalpb.MsgPosition) error {
dropped bool, pos *internalpb.MsgPosition) error {
// empty flush
if data == nil || data.buffer == nil {
m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{},
map[UniqueID]string{}, map[UniqueID]string{}, flushed, pos)
map[UniqueID]string{}, map[UniqueID]string{}, flushed, dropped, pos)
return nil
}
......@@ -292,7 +293,7 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{
BaseKV: m.BaseKV,
data: kvs,
}, field2Insert, field2Stats, flushed, pos)
}, field2Insert, field2Stats, flushed, dropped, pos)
return nil
}
......
......@@ -68,7 +68,7 @@ func TestOrderFlushQueue_Execute(t *testing.T) {
wg.Done()
}(ids[i])
go func(id []byte) {
q.enqueueInsertFlush(&emptyFlushTask{}, map[UniqueID]string{}, map[UniqueID]string{}, false, &internalpb.MsgPosition{
q.enqueueInsertFlush(&emptyFlushTask{}, map[UniqueID]string{}, map[UniqueID]string{}, false, false, &internalpb.MsgPosition{
MsgID: id,
})
wg.Done()
......@@ -107,7 +107,7 @@ func TestOrderFlushQueue_Order(t *testing.T) {
q.enqueueDelFlush(&emptyFlushTask{}, &DelDataBuf{}, &internalpb.MsgPosition{
MsgID: ids[i],
})
q.enqueueInsertFlush(&emptyFlushTask{}, map[UniqueID]string{}, map[UniqueID]string{}, false, &internalpb.MsgPosition{
q.enqueueInsertFlush(&emptyFlushTask{}, map[UniqueID]string{}, map[UniqueID]string{}, false, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()
......@@ -149,7 +149,7 @@ func TestRendezvousFlushManager(t *testing.T) {
m.flushDelData(nil, 1, &internalpb.MsgPosition{
MsgID: ids[i],
})
m.flushBufferData(nil, 1, true, &internalpb.MsgPosition{
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()
......@@ -199,7 +199,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
m.flushDelData(nil, 1, &internalpb.MsgPosition{
MsgID: ids[i],
})
m.flushBufferData(nil, 1, true, &internalpb.MsgPosition{
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()
......@@ -212,7 +212,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
finish.Add(1)
id := make([]byte, 10)
rand.Read(id)
m.flushBufferData(nil, 2, true, &internalpb.MsgPosition{
m.flushBufferData(nil, 2, true, false, &internalpb.MsgPosition{
MsgID: id,
})
......@@ -238,7 +238,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
finish.Add(1)
rand.Read(id)
m.flushBufferData(nil, 2, false, &internalpb.MsgPosition{
m.flushBufferData(nil, 2, false, false, &internalpb.MsgPosition{
MsgID: id,
})
m.flushDelData(nil, 2, &internalpb.MsgPosition{
......
......@@ -58,6 +58,7 @@ type flushTaskRunner struct {
deltaLogs []*DelDataBuf
pos *internalpb.MsgPosition
flushed bool
dropped bool
}
type taskInjection struct {
......@@ -76,12 +77,13 @@ func (t *flushTaskRunner) init(f notifyMetaFunc, postFunc taskPostFunc, signal <
}
// runFlushInsert executei flush insert task with once and retry
func (t *flushTaskRunner) runFlushInsert(task flushInsertTask, binlogs, statslogs map[UniqueID]string, flushed bool, pos *internalpb.MsgPosition) {
func (t *flushTaskRunner) runFlushInsert(task flushInsertTask, binlogs, statslogs map[UniqueID]string, flushed bool, dropped bool, pos *internalpb.MsgPosition) {
t.insertOnce.Do(func() {
t.insertLogs = binlogs
t.statsLogs = statslogs
t.flushed = flushed
t.pos = pos
t.dropped = dropped
go func() {
err := errStart
for err != nil {
......@@ -150,6 +152,7 @@ func (t *flushTaskRunner) getFlushPack() *segmentFlushPack {
pos: t.pos,
deltaLogs: t.deltaLogs,
flushed: t.flushed,
dropped: t.dropped,
}
return pack
......
......@@ -44,7 +44,7 @@ func TestFlushTaskRunner(t *testing.T) {
assert.False(t, saveFlag)
assert.False(t, nextFlag)
task.runFlushInsert(&emptyFlushTask{}, nil, nil, false, nil)
task.runFlushInsert(&emptyFlushTask{}, nil, nil, false, false, nil)
task.runFlushDel(&emptyFlushTask{}, &DelDataBuf{})
assert.False(t, saveFlag)
......@@ -102,7 +102,7 @@ func TestFlushTaskRunner_Injection(t *testing.T) {
assert.False(t, saveFlag)
assert.False(t, nextFlag)
task.runFlushInsert(&emptyFlushTask{}, nil, nil, false, nil)
task.runFlushInsert(&emptyFlushTask{}, nil, nil, false, false, nil)
task.runFlushDel(&emptyFlushTask{}, &DelDataBuf{})
assert.False(t, saveFlag)
......
......@@ -417,6 +417,7 @@ func (df *DataFactory) GenMsgStreamInsertMsg(idx int, chanName string) *msgstrea
CollectionName: "col1",
PartitionName: "default",
SegmentID: 1,
CollectionID: UniqueID(0),
ShardName: chanName,
Timestamps: []Timestamp{Timestamp(idx + 1000)},
RowIDs: []UniqueID{UniqueID(idx)},
......
......@@ -23,18 +23,19 @@ import (
"sync"
"sync/atomic"
"github.com/bits-and-blooms/bloom/v3"
"go.uber.org/zap"
"github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
)
const (
......@@ -49,6 +50,7 @@ type Replica interface {
getCollectionSchema(collectionID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error)
getCollectionAndPartitionID(segID UniqueID) (collID, partitionID UniqueID, err error)
listAllSegmentIDs() []UniqueID
addNewSegment(segID, collID, partitionID UniqueID, channelName string, startPos, endPos *internalpb.MsgPosition) error
addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlog []*datapb.FieldBinlog, cp *segmentCheckPoint) error
filterSegments(channelName string, partitionID UniqueID) []*Segment
......@@ -618,13 +620,16 @@ func (replica *SegmentReplica) getCollectionSchema(collID UniqueID, ts Timestamp
return nil, fmt.Errorf("Not supported collection %v", collID)
}
sch, err := replica.metaService.getCollectionSchema(context.Background(), collID, ts)
if err != nil {
log.Error("Grpc error", zap.Error(err))
return nil, err
if replica.collSchema == nil {
sch, err := replica.metaService.getCollectionSchema(context.Background(), collID, ts)
if err != nil {
log.Error("Grpc error", zap.Error(err))
return nil, err
}
replica.collSchema = sch
}
return sch, nil
return replica.collSchema, nil
}
func (replica *SegmentReplica) validCollection(collID UniqueID) bool {
......@@ -700,3 +705,24 @@ func (replica *SegmentReplica) addFlushedSegmentWithPKs(segID, collID, partID Un
replica.flushedSegments[segID] = seg
replica.segMu.Unlock()
}
func (replica *SegmentReplica) listAllSegmentIDs() []UniqueID {
replica.segMu.Lock()
defer replica.segMu.Unlock()
var segIDs []UniqueID
for _, seg := range replica.newSegments {
segIDs = append(segIDs, seg.segmentID)
}
for _, seg := range replica.normalSegments {
segIDs = append(segIDs, seg.segmentID)
}
for _, seg := range replica.flushedSegments {
segIDs = append(segIDs, seg.segmentID)
}
return segIDs
}
......@@ -571,6 +571,18 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
})
t.Run("Test listAllSegmentIDs", func(t *testing.T) {
sr := &SegmentReplica{
newSegments: map[UniqueID]*Segment{1: {segmentID: 1}},
normalSegments: map[UniqueID]*Segment{2: {segmentID: 2}},
flushedSegments: map[UniqueID]*Segment{3: {segmentID: 3}},
}
ids := sr.listAllSegmentIDs()
assert.ElementsMatch(t, []UniqueID{1, 2, 3}, ids)
})
t.Run("Test_addSegmentMinIOLoadError", func(t *testing.T) {
sr, err := newReplica(context.Background(), rc, 1)
assert.Nil(t, err)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册