未验证 提交 aac6323c 编写于 作者: X XuanYang-cn 提交者: GitHub

Use config as parameter (#9707)

Make functions in flowgraph lesser parameters
Signed-off-by: Nyangxuan <xuan.yang@zilliz.com>
上级 66e8583f
......@@ -82,6 +82,26 @@ func newDataSyncService(ctx context.Context,
return service, nil
}
type parallelConfig struct {
maxQueueLength int32
maxParallelism int32
}
type nodeConfig struct {
msFactory msgstream.Factory // msgStream factory
collectionID UniqueID
vChannelName string
replica Replica // Segment replica
allocator allocatorInterface
// defaults
parallelConfig
}
func newParallelConfig() parallelConfig {
return parallelConfig{Params.FlowGraphMaxQueueLength, Params.FlowGraphMaxParallelism}
}
// start starts the flowgraph in datasyncservice
func (dsService *dataSyncService) start() {
if dsService.fg != nil {
......@@ -163,41 +183,37 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
dsService.saveBinlog = saveBinlog
c := &nodeConfig{
msFactory: dsService.msFactory,
collectionID: vchanInfo.GetCollectionID(),
vChannelName: vchanInfo.GetChannelName(),
replica: dsService.replica,
allocator: dsService.idAllocator,
parallelConfig: newParallelConfig(),
}
var dmStreamNode Node
dmStreamNode, err = newDmInputNode(
dsService.ctx,
dsService.msFactory,
vchanInfo.CollectionID,
vchanInfo.GetChannelName(),
vchanInfo.GetSeekPosition(),
)
dmStreamNode, err = newDmInputNode(dsService.ctx, vchanInfo.GetSeekPosition(), c)
if err != nil {
return err
}
var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo)
var insertBufferNode Node
insertBufferNode, err = newInsertBufferNode(
dsService.ctx,
dsService.replica,
dsService.msFactory,
dsService.idAllocator,
dsService.flushChs.insertBufferCh,
saveBinlog,
vchanInfo.GetChannelName(),
dsService.flushingSegCache,
c,
)
if err != nil {
return err
}
var deleteNode Node
deleteNode, err = newDeleteNode(
dsService.ctx,
dsService.replica,
dsService.idAllocator,
dsService.flushChs.deleteBufferCh,
vchanInfo.GetChannelName(),
)
deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushChs.deleteBufferCh, c)
if err != nil {
return err
}
......
......@@ -166,7 +166,8 @@ func (ddn *ddNode) isFlushed(segmentID UniqueID) bool {
func newDDNode(clearSignal chan<- UniqueID, collID UniqueID, vchanInfo *datapb.VchannelInfo) *ddNode {
baseNode := BaseNode{}
baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength)
baseNode.SetMaxQueueLength(Params.FlowGraphMaxQueueLength)
baseNode.SetMaxParallelism(Params.FlowGraphMaxParallelism)
fs := make([]*datapb.SegmentInfo, 0, len(vchanInfo.GetFlushedSegments()))
fs = append(fs, vchanInfo.GetFlushedSegments()...)
......
......@@ -254,16 +254,10 @@ func (dn *deleteNode) flushDelData(collID UniqueID, timeRange TimeRange) {
}
}
func newDeleteNode(
ctx context.Context,
replica Replica,
idAllocator allocatorInterface,
flushCh <-chan *flushMsg,
channelName string,
) (*deleteNode, error) {
func newDeleteNode(ctx context.Context, flushCh <-chan *flushMsg, config *nodeConfig) (*deleteNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(Params.FlowGraphMaxQueueLength)
baseNode.SetMaxParallelism(Params.FlowGraphMaxParallelism)
baseNode.SetMaxQueueLength(config.maxQueueLength)
baseNode.SetMaxParallelism(config.maxParallelism)
// MinIO
option := &miniokv.Option{
......@@ -280,12 +274,13 @@ func newDeleteNode(
}
return &deleteNode{
BaseNode: baseNode,
channelName: channelName,
delBuf: sync.Map{},
replica: replica,
idAllocator: idAllocator,
flushCh: flushCh,
minIOKV: minIOKV,
BaseNode: baseNode,
delBuf: sync.Map{},
flushCh: flushCh,
minIOKV: minIOKV,
replica: config.replica,
idAllocator: config.allocator,
channelName: config.vChannelName,
}, nil
}
......@@ -63,18 +63,17 @@ func (replica *mockReplica) getCollectionAndPartitionID(segID UniqueID) (collID,
func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) {
tests := []struct {
ctx context.Context
replica Replica
idAllocator allocatorInterface
ctx context.Context
config *nodeConfig
description string
}{
{context.Background(), &SegmentReplica{}, &allocator{}, "pointer of SegmentReplica"},
{context.Background(), &nodeConfig{}, "pointer of SegmentReplica"},
}
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
dn, err := newDeleteNode(test.ctx, test.replica, test.idAllocator, make(chan *flushMsg), "")
dn, err := newDeleteNode(test.ctx, make(chan *flushMsg), test.config)
assert.Nil(t, err)
assert.NotNil(t, dn)
......@@ -173,7 +172,13 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
)
replica := genMockReplica(segIDs, pks, chanName)
t.Run("Test get segment by primary keys", func(te *testing.T) {
dn, err := newDeleteNode(context.Background(), replica, &allocator{}, make(chan *flushMsg), chanName)
c := &nodeConfig{
replica: replica,
allocator: &allocator{},
vChannelName: chanName,
}
dn, err := newDeleteNode(context.Background(), make(chan *flushMsg), c)
assert.Nil(t, err)
results := dn.filterSegmentByPK(0, pks)
......@@ -200,7 +205,12 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
Params.DeleteBinlogRootPath = testPath
flushChan := make(chan *flushMsg, 100)
delNode, err := newDeleteNode(ctx, replica, NewAllocatorFactory(), flushChan, chanName)
c := &nodeConfig{
replica: replica,
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, flushChan, c)
assert.Nil(te, err)
flushChan <- &flushMsg{
......
......@@ -16,7 +16,6 @@ import (
"fmt"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util/flowgraph"
......@@ -26,21 +25,18 @@ import (
// DmInputNode receives messages from message streams, packs messages between two timeticks, and passes all
// messages between two timeticks to the following flowgraph node. In DataNode, the following flow graph node is
// flowgraph ddNode.
func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID UniqueID, chanName string, seekPos *internalpb.MsgPosition) (*flowgraph.InputNode, error) {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNodeConfig *nodeConfig) (*flowgraph.InputNode, error) {
// subName should be unique, since pchannelName is shared among several collections
// consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10)
consumeSubName := fmt.Sprintf("%s-%d", Params.MsgChannelSubName, collID)
insertStream, err := factory.NewTtMsgStream(ctx)
consumeSubName := fmt.Sprintf("%s-%d", Params.MsgChannelSubName, dmNodeConfig.collectionID)
insertStream, err := dmNodeConfig.msFactory.NewTtMsgStream(ctx)
if err != nil {
return nil, err
}
// MsgStream needs a physical channel name, but the channel name in seek position from DataCoord
// is virtual channel name, so we need to convert vchannel name into pchannel neme here.
pchannelName := rootcoord.ToPhysicalChannel(chanName)
pchannelName := rootcoord.ToPhysicalChannel(dmNodeConfig.vChannelName)
insertStream.AsConsumer([]string{pchannelName}, consumeSubName)
log.Debug("datanode AsConsumer", zap.String("physical channel", pchannelName), zap.String("subName", consumeSubName))
......@@ -53,7 +49,6 @@ func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID Uniqu
}
}
var stream msgstream.MsgStream = insertStream
node := flowgraph.NewInputNode(stream, "dmInputNode", maxQueueLength, maxParallelism)
node := flowgraph.NewInputNode(insertStream, "dmInputNode", dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism)
return node, nil
}
......@@ -91,6 +91,6 @@ func (mtm *mockTtMsgStream) Seek(offset []*internalpb.MsgPosition) error {
func TestNewDmInputNode(t *testing.T) {
ctx := context.Background()
_, err := newDmInputNode(ctx, &mockMsgStreamFactory{}, 0, "abc_adc", new(internalpb.MsgPosition))
_, err := newDmInputNode(ctx, new(internalpb.MsgPosition), &nodeConfig{msFactory: &mockMsgStreamFactory{}})
assert.Nil(t, err)
}
......@@ -874,23 +874,12 @@ func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID Uni
return ibNode.replica.getCollectionAndPartitionID(segmentID)
}
func newInsertBufferNode(
ctx context.Context,
replica Replica,
factory msgstream.Factory,
idAllocator allocatorInterface,
flushCh <-chan *flushMsg,
saveBinlog func(*segmentFlushUnit) error,
channelName string,
flushingSegCache *Cache,
) (*insertBufferNode, error) {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
func newInsertBufferNode(ctx context.Context, flushCh <-chan *flushMsg, saveBinlog func(*segmentFlushUnit) error,
flushingSegCache *Cache, config *nodeConfig) (*insertBufferNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
baseNode.SetMaxQueueLength(config.maxQueueLength)
baseNode.SetMaxParallelism(config.maxParallelism)
// MinIO
option := &miniokv.Option{
......@@ -908,7 +897,7 @@ func newInsertBufferNode(
}
//input stream, data node time tick
wTt, err := factory.NewMsgStream(ctx)
wTt, err := config.msFactory.NewMsgStream(ctx)
if err != nil {
return nil, err
}
......@@ -918,7 +907,7 @@ func newInsertBufferNode(
wTtMsgStream.Start()
// update statistics channel
segS, err := factory.NewMsgStream(ctx)
segS, err := config.msFactory.NewMsgStream(ctx)
if err != nil {
return nil, err
}
......@@ -931,16 +920,17 @@ func newInsertBufferNode(
BaseNode: baseNode,
insertBuffer: sync.Map{},
minIOKV: minIOKV,
channelName: channelName,
timeTickStream: wTtMsgStream,
segmentStatisticsStream: segStatisticsMsgStream,
replica: replica,
flushMap: sync.Map{},
flushChan: flushCh,
idAllocator: idAllocator,
dsSaveBinlog: saveBinlog,
flushingSegCache: flushingSegCache,
replica: config.replica,
idAllocator: config.allocator,
channelName: config.vChannelName,
}, nil
}
......@@ -84,27 +84,36 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
}
flushChan := make(chan *flushMsg, 100)
iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache())
c := &nodeConfig{
replica: replica,
msFactory: msFactory,
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
assert.NotNil(t, iBNode)
require.NoError(t, err)
ctxDone, cancel := context.WithCancel(ctx)
cancel() // cancel now to make context done
_, err = newInsertBufferNode(ctxDone, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache())
_, err = newInsertBufferNode(ctxDone, flushChan, saveBinlog, newCache(), c)
assert.Error(t, err)
cdf := &CDFMsFactory{
c.msFactory = &CDFMsFactory{
Factory: msFactory,
cd: 0,
}
_, err = newInsertBufferNode(ctx, replica, cdf, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache())
_, err = newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
assert.Error(t, err)
cdf = &CDFMsFactory{
c.msFactory = &CDFMsFactory{
Factory: msFactory,
cd: 1,
}
_, err = newInsertBufferNode(ctx, replica, cdf, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache())
_, err = newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
assert.Error(t, err)
}
......@@ -170,7 +179,14 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
}
flushChan := make(chan *flushMsg, 100)
iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache())
c := &nodeConfig{
replica: replica,
msFactory: msFactory,
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
require.NoError(t, err)
flushChan <- &flushMsg{
......@@ -238,7 +254,14 @@ func TestFlushSegment(t *testing.T) {
saveBinlog := func(*segmentFlushUnit) error {
return nil
}
ibNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache())
c := &nodeConfig{
replica: replica,
msFactory: msFactory,
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
ibNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
require.NoError(t, err)
flushSegment(collMeta,
......@@ -352,7 +375,13 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
}
flushChan := make(chan *flushMsg, 100)
iBNode, err := newInsertBufferNode(ctx, colRep, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache())
c := &nodeConfig{
replica: colRep,
msFactory: msFactory,
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
require.NoError(t, err)
// Auto flush number of rows set to 2
......@@ -583,7 +612,13 @@ func TestInsertBufferNode_getCollMetaBySegID(t *testing.T) {
}
flushChan := make(chan *flushMsg, 100)
iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache())
c := &nodeConfig{
replica: replica,
msFactory: msFactory,
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
require.NoError(t, err)
meta, err := iBNode.getCollMetabySegID(1, 101)
......@@ -636,7 +671,13 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
}
flushChan := make(chan *flushMsg, 100)
iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache())
c := &nodeConfig{
replica: replica,
msFactory: msFactory,
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
require.NoError(t, err)
inMsg := GenFlowGraphInsertMsg(insertChannelName)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册