Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
e6de86a4
M
milvus
项目概览
milvus
/
milvus
10 个月 前同步成功
通知
260
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
e6de86a4
编写于
8月 31, 2021
作者:
C
Cai Yudong
提交者:
GitHub
8月 31, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Apply for msgstream from pool when creating collection (#7323)
Signed-off-by:
N
yudong.cai
<
yudong.cai@zilliz.com
>
上级
8a94a9ee
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
256 addition
and
130 deletion
+256
-130
configs/advanced/root_coord.yaml
configs/advanced/root_coord.yaml
+1
-0
internal/datanode/data_sync_service.go
internal/datanode/data_sync_service.go
+2
-3
internal/datanode/flow_graph_dd_node.go
internal/datanode/flow_graph_dd_node.go
+8
-1
internal/datanode/flow_graph_dmstream_input_node.go
internal/datanode/flow_graph_dmstream_input_node.go
+7
-2
internal/datanode/flow_graph_insert_buffer_node.go
internal/datanode/flow_graph_insert_buffer_node.go
+7
-3
internal/querynode/flow_graph_filter_dm_node.go
internal/querynode/flow_graph_filter_dm_node.go
+3
-3
internal/querynode/flow_graph_query_node.go
internal/querynode/flow_graph_query_node.go
+1
-0
internal/rootcoord/dml_channels.go
internal/rootcoord/dml_channels.go
+74
-78
internal/rootcoord/dml_channels_test.go
internal/rootcoord/dml_channels_test.go
+77
-0
internal/rootcoord/param_table.go
internal/rootcoord/param_table.go
+6
-0
internal/rootcoord/root_coord.go
internal/rootcoord/root_coord.go
+5
-5
internal/rootcoord/root_coord_test.go
internal/rootcoord/root_coord_test.go
+5
-2
internal/rootcoord/task.go
internal/rootcoord/task.go
+3
-5
internal/rootcoord/timeticksync.go
internal/rootcoord/timeticksync.go
+42
-24
internal/util/timerecord/time_recorder.go
internal/util/timerecord/time_recorder.go
+15
-4
未找到文件。
configs/advanced/root_coord.yaml
浏览文件 @
e6de86a4
...
...
@@ -10,6 +10,7 @@
# or implied. See the License for the specific language governing permissions and limitations under the License.
rootcoord
:
dmlChannelNum
:
64
maxPartitionNum
:
4096
minSegmentSizeToEnableIndex
:
1024
timeout
:
3600
# time out, 5 seconds
...
...
internal/datanode/data_sync_service.go
浏览文件 @
e6de86a4
...
...
@@ -19,7 +19,6 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/flowgraph"
...
...
@@ -148,11 +147,11 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
return
nil
}
pchan
:=
rootcoord
.
ToPhysicalChannel
(
vchanInfo
.
GetChannelName
())
var
dmStreamNode
Node
=
newDmInputNode
(
dsService
.
ctx
,
dsService
.
msFactory
,
pchan
,
vchanInfo
.
CollectionID
,
vchanInfo
.
GetChannelName
(),
vchanInfo
.
GetSeekPosition
(),
)
var
ddNode
Node
=
newDDNode
(
dsService
.
clearSignal
,
dsService
.
collectionID
,
vchanInfo
)
...
...
internal/datanode/flow_graph_dd_node.go
浏览文件 @
e6de86a4
...
...
@@ -92,15 +92,22 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
case
commonpb
.
MsgType_Insert
:
log
.
Debug
(
"DDNode with insert messages"
)
imsg
:=
msg
.
(
*
msgstream
.
InsertMsg
)
if
msg
.
EndTs
()
<
FilterThreshold
{
log
.
Info
(
"Filtering Insert Messages"
,
zap
.
Uint64
(
"Message endts"
,
msg
.
EndTs
()),
zap
.
Uint64
(
"FilterThreshold"
,
FilterThreshold
),
)
if
ddn
.
filterFlushedSegmentInsertMessages
(
msg
.
(
*
msgstream
.
InsertMsg
)
)
{
if
ddn
.
filterFlushedSegmentInsertMessages
(
imsg
)
{
continue
}
}
if
imsg
.
CollectionID
!=
ddn
.
collectionID
{
//log.Debug("filter invalid InsertMsg, collection mis-match",
// zap.Int64("msg collID", imsg.CollectionID),
// zap.Int64("ddn collID", ddn.collectionID))
continue
}
iMsg
.
insertMessages
=
append
(
iMsg
.
insertMessages
,
msg
.
(
*
msgstream
.
InsertMsg
))
}
}
...
...
internal/datanode/flow_graph_dmstream_input_node.go
浏览文件 @
e6de86a4
...
...
@@ -13,19 +13,24 @@ package datanode
import
(
"context"
"strconv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
func
newDmInputNode
(
ctx
context
.
Context
,
factory
msgstream
.
Factory
,
pchannel
Name
string
,
seekPos
*
internalpb
.
MsgPosition
)
*
flowgraph
.
InputNode
{
func
newDmInputNode
(
ctx
context
.
Context
,
factory
msgstream
.
Factory
,
collID
UniqueID
,
chan
Name
string
,
seekPos
*
internalpb
.
MsgPosition
)
*
flowgraph
.
InputNode
{
maxQueueLength
:=
Params
.
FlowGraphMaxQueueLength
maxParallelism
:=
Params
.
FlowGraphMaxParallelism
consumeSubName
:=
Params
.
MsgChannelSubName
// subName should be unique, since pchannelName is shared among several collections
consumeSubName
:=
Params
.
MsgChannelSubName
+
"-"
+
strconv
.
FormatInt
(
collID
,
10
)
insertStream
,
_
:=
factory
.
NewTtMsgStream
(
ctx
)
pchannelName
:=
rootcoord
.
ToPhysicalChannel
(
chanName
)
insertStream
.
AsConsumer
([]
string
{
pchannelName
},
consumeSubName
)
log
.
Debug
(
"datanode AsConsumer physical channel: "
+
pchannelName
+
" : "
+
consumeSubName
)
...
...
internal/datanode/flow_graph_insert_buffer_node.go
浏览文件 @
e6de86a4
...
...
@@ -184,9 +184,13 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
err
:=
ibNode
.
replica
.
addNewSegment
(
currentSegID
,
collID
,
partitionID
,
msg
.
GetChannelID
(),
iMsg
.
startPositions
[
0
],
iMsg
.
endPositions
[
0
])
if
err
!=
nil
{
log
.
Error
(
"add segment wrong"
,
zap
.
Error
(
err
))
log
.
Error
(
"add segment wrong"
,
zap
.
Int64
(
"segID"
,
currentSegID
),
zap
.
Int64
(
"collID"
,
collID
),
zap
.
Int64
(
"partID"
,
partitionID
),
zap
.
String
(
"chanName"
,
msg
.
GetChannelID
()),
zap
.
Error
(
err
))
}
}
segNum
:=
uniqueSeg
[
currentSegID
]
...
...
@@ -199,7 +203,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
err
:=
ibNode
.
replica
.
updateStatistics
(
id
,
num
)
if
err
!=
nil
{
log
.
Error
(
"update Segment Row number wrong"
,
zap
.
Error
(
err
))
log
.
Error
(
"update Segment Row number wrong"
,
zap
.
Int64
(
"segID"
,
id
),
zap
.
Error
(
err
))
}
}
...
...
internal/querynode/flow_graph_filter_dm_node.go
浏览文件 @
e6de86a4
...
...
@@ -110,9 +110,9 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
// check if the collection from message is target collection
if
msg
.
CollectionID
!=
fdmNode
.
collectionID
{
log
.
Debug
(
"filter invalid insert message, collection is not the target collection"
,
zap
.
Any
(
"collectionID"
,
msg
.
CollectionID
),
zap
.
Any
(
"partitionID"
,
msg
.
PartitionID
))
//
log.Debug("filter invalid insert message, collection is not the target collection",
//
zap.Any("collectionID", msg.CollectionID),
//
zap.Any("partitionID", msg.PartitionID))
return
nil
}
...
...
internal/querynode/flow_graph_query_node.go
浏览文件 @
e6de86a4
...
...
@@ -125,6 +125,7 @@ func (q *queryNodeFlowGraph) consumerFlowGraph(channel Channel, subName ConsumeS
log
.
Debug
(
"query node flow graph consumes from pChannel"
,
zap
.
Any
(
"collectionID"
,
q
.
collectionID
),
zap
.
Any
(
"channel"
,
channel
),
zap
.
Any
(
"subName"
,
subName
),
)
return
nil
}
...
...
internal/rootcoord/dml_channels.go
浏览文件 @
e6de86a4
...
...
@@ -15,86 +15,80 @@ import (
"fmt"
"sync"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"g
o.uber.org/zap
"
"g
ithub.com/milvus-io/milvus/internal/util/funcutil
"
)
type
dmlChannels
struct
{
core
*
Core
lock
sync
.
RWMutex
dml
map
[
string
]
msgstream
.
MsgStream
core
*
Core
namePrefix
string
capacity
int64
refcnt
sync
.
Map
idx
*
atomic
.
Int64
pool
sync
.
Map
}
func
newDMLChannels
(
c
*
Core
)
*
dmlChannels
{
return
&
dmlChannels
{
core
:
c
,
lock
:
sync
.
RWMutex
{},
dml
:
make
(
map
[
string
]
msgstream
.
MsgStream
),
func
newDmlChannels
(
c
*
Core
,
chanNum
int64
)
*
dmlChannels
{
d
:=
&
dmlChannels
{
core
:
c
,
namePrefix
:
fmt
.
Sprintf
(
"dml_%s"
,
funcutil
.
RandomString
(
8
)),
capacity
:
chanNum
,
refcnt
:
sync
.
Map
{},
idx
:
atomic
.
NewInt64
(
0
),
pool
:
sync
.
Map
{},
}
var
i
int64
for
i
=
0
;
i
<
chanNum
;
i
++
{
name
:=
fmt
.
Sprintf
(
"%s_%d"
,
d
.
namePrefix
,
i
)
ms
,
err
:=
c
.
msFactory
.
NewMsgStream
(
c
.
ctx
)
if
err
!=
nil
{
log
.
Error
(
"add msgstream failed"
,
zap
.
String
(
"name"
,
name
),
zap
.
Error
(
err
))
panic
(
"add msgstream failed"
)
}
ms
.
AsProducer
([]
string
{
name
})
d
.
pool
.
Store
(
name
,
&
ms
)
}
log
.
Debug
(
"init dml channels"
,
zap
.
Int64
(
"num"
,
chanNum
))
return
d
}
// GetNumChannels get current dml channel count
func
(
d
*
dmlChannels
)
GetNumChannels
()
int
{
d
.
lock
.
RLock
(
)
d
efer
d
.
lock
.
RUnlock
(
)
return
len
(
d
.
dml
)
func
(
d
*
dmlChannels
)
GetDmlMsgStreamName
()
string
{
cnt
:=
d
.
idx
.
Load
()
name
:=
fmt
.
Sprintf
(
"%s_%d"
,
d
.
namePrefix
,
cnt
)
d
.
idx
.
Store
((
cnt
+
1
)
%
d
.
capacity
)
return
name
}
// ListChannels lists all dml channel names
func
(
d
*
dmlChannels
)
ListChannels
()
[]
string
{
d
.
lock
.
RLock
()
defer
d
.
lock
.
RUnlock
()
ret
:=
make
([]
string
,
0
,
len
(
d
.
dml
))
for
n
:=
range
d
.
dml
{
ret
=
append
(
ret
,
n
)
}
return
ret
chanNames
:=
make
([]
string
,
0
)
d
.
refcnt
.
Range
(
func
(
k
,
v
interface
{})
bool
{
chanNames
=
append
(
chanNames
,
k
.
(
string
))
return
true
})
return
chanNames
}
// Produce produces msg pack into specified channel
func
(
d
*
dmlChannels
)
Produce
(
name
string
,
pack
*
msgstream
.
MsgPack
)
error
{
d
.
lock
.
Lock
()
defer
d
.
lock
.
Unlock
()
ds
,
ok
:=
d
.
dml
[
name
]
if
!
ok
{
return
fmt
.
Errorf
(
"channel %s not exist"
,
name
)
}
if
err
:=
ds
.
Produce
(
pack
);
err
!=
nil
{
return
err
}
return
nil
// GetNumChannels get current dml channel count
func
(
d
*
dmlChannels
)
GetNumChannels
()
int
{
return
len
(
d
.
ListChannels
())
}
// Broadcast broadcasts msg pack into specified channel
func
(
d
*
dmlChannels
)
Broadcast
(
name
string
,
pack
*
msgstream
.
MsgPack
)
error
{
d
.
lock
.
Lock
()
defer
d
.
lock
.
Unlock
()
ds
,
ok
:=
d
.
dml
[
name
]
if
!
ok
{
return
fmt
.
Errorf
(
"channel %s not exist"
,
name
)
}
if
err
:=
ds
.
Broadcast
(
pack
);
err
!=
nil
{
return
err
}
return
nil
}
// BroadcastAll invoke broadcast with provided msg pack in all channels specified
func
(
d
*
dmlChannels
)
BroadcastAll
(
channels
[]
string
,
pack
*
msgstream
.
MsgPack
)
error
{
d
.
lock
.
Lock
()
defer
d
.
lock
.
Unlock
()
for
_
,
ch
:=
range
channels
{
ds
,
ok
:=
d
.
dml
[
ch
]
if
!
ok
{
return
fmt
.
Errorf
(
"channel %s not exist"
,
ch
)
func
(
d
*
dmlChannels
)
Broadcast
(
chanNames
[]
string
,
pack
*
msgstream
.
MsgPack
)
error
{
for
_
,
chanName
:=
range
chanNames
{
// only in-use chanName exist in refcnt
if
_
,
ok
:=
d
.
refcnt
.
Load
(
chanName
);
!
ok
{
return
fmt
.
Errorf
(
"channel %s not exist"
,
chanName
)
}
if
err
:=
ds
.
Broadcast
(
pack
);
err
!=
nil
{
v
,
_
:=
d
.
pool
.
Load
(
chanName
)
if
err
:=
(
*
(
v
.
(
*
msgstream
.
MsgStream
)))
.
Broadcast
(
pack
);
err
!=
nil
{
return
err
}
}
...
...
@@ -103,33 +97,35 @@ func (d *dmlChannels) BroadcastAll(channels []string, pack *msgstream.MsgPack) e
// AddProducerChannels add named channels as producer
func
(
d
*
dmlChannels
)
AddProducerChannels
(
names
...
string
)
{
d
.
lock
.
Lock
()
defer
d
.
lock
.
Unlock
()
for
_
,
name
:=
range
names
{
log
.
Debug
(
"add dml channel"
,
zap
.
String
(
"channel name"
,
name
))
_
,
ok
:=
d
.
dml
[
name
]
if
!
ok
{
ms
,
err
:=
d
.
core
.
msFactory
.
NewMsgStream
(
d
.
core
.
ctx
)
if
err
!=
nil
{
log
.
Debug
(
"add msgstream failed"
,
zap
.
String
(
"name"
,
name
),
zap
.
Error
(
err
))
continue
if
_
,
ok
:=
d
.
pool
.
Load
(
name
);
!
ok
{
log
.
Error
(
"invalid channel name"
,
zap
.
String
(
"chanName"
,
name
))
panic
(
"invalid channel name"
)
}
else
{
var
cnt
int64
if
_
,
ok
:=
d
.
refcnt
.
Load
(
name
);
!
ok
{
cnt
=
1
}
else
{
v
,
_
:=
d
.
refcnt
.
Load
(
name
)
cnt
=
v
.
(
int64
)
+
1
}
ms
.
AsProducer
([]
string
{
name
}
)
d
.
dml
[
name
]
=
ms
d
.
refcnt
.
Store
(
name
,
cnt
)
log
.
Debug
(
"assign dml channel"
,
zap
.
String
(
"chanName"
,
name
),
zap
.
Int64
(
"refcnt"
,
cnt
))
}
}
}
// RemoveProducerChannels removes specified channels
func
(
d
*
dmlChannels
)
RemoveProducerChannels
(
names
...
string
)
{
d
.
lock
.
Lock
()
defer
d
.
lock
.
Unlock
()
for
_
,
name
:=
range
names
{
if
ds
,
ok
:=
d
.
dml
[
name
];
ok
{
ds
.
Close
()
delete
(
d
.
dml
,
name
)
v
,
ok
:=
d
.
refcnt
.
Load
(
name
)
if
ok
{
cnt
:=
v
.
(
int64
)
if
cnt
>
1
{
d
.
refcnt
.
Store
(
name
,
cnt
-
1
)
}
else
{
d
.
refcnt
.
Delete
(
name
)
}
}
}
}
internal/rootcoord/dml_channels_test.go
0 → 100644
浏览文件 @
e6de86a4
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package
rootcoord
import
(
"context"
"fmt"
"testing"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/stretchr/testify/assert"
)
func
TestDmlChannels
(
t
*
testing
.
T
)
{
const
(
totalDmlChannelNum
=
2
)
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
factory
:=
msgstream
.
NewPmsFactory
()
Params
.
Init
()
m
:=
map
[
string
]
interface
{}{
"pulsarAddress"
:
Params
.
PulsarAddress
,
"receiveBufSize"
:
1024
,
"pulsarBufSize"
:
1024
}
err
:=
factory
.
SetParams
(
m
)
assert
.
Nil
(
t
,
err
)
core
,
err
:=
NewCore
(
ctx
,
factory
)
assert
.
Nil
(
t
,
err
)
dml
:=
newDmlChannels
(
core
,
totalDmlChannelNum
)
chanNames
:=
dml
.
ListChannels
()
assert
.
Equal
(
t
,
0
,
len
(
chanNames
))
randStr
:=
funcutil
.
RandomString
(
8
)
assert
.
Panics
(
t
,
func
()
{
dml
.
AddProducerChannels
(
randStr
)
})
err
=
dml
.
Broadcast
([]
string
{
randStr
},
nil
)
assert
.
NotNil
(
t
,
err
)
assert
.
EqualError
(
t
,
err
,
fmt
.
Sprintf
(
"channel %s not exist"
,
randStr
))
// dml_xxx_0 => {chanName0, chanName2}
// dml_xxx_1 => {chanName1}
chanName0
:=
dml
.
GetDmlMsgStreamName
()
dml
.
AddProducerChannels
(
chanName0
)
assert
.
Equal
(
t
,
1
,
dml
.
GetNumChannels
())
chanName1
:=
dml
.
GetDmlMsgStreamName
()
dml
.
AddProducerChannels
(
chanName1
)
assert
.
Equal
(
t
,
2
,
dml
.
GetNumChannels
())
chanName2
:=
dml
.
GetDmlMsgStreamName
()
dml
.
AddProducerChannels
(
chanName2
)
assert
.
Equal
(
t
,
2
,
dml
.
GetNumChannels
())
dml
.
RemoveProducerChannels
(
chanName0
)
assert
.
Equal
(
t
,
2
,
dml
.
GetNumChannels
())
dml
.
RemoveProducerChannels
(
chanName1
)
assert
.
Equal
(
t
,
1
,
dml
.
GetNumChannels
())
dml
.
RemoveProducerChannels
(
chanName0
)
assert
.
Equal
(
t
,
0
,
dml
.
GetNumChannels
())
}
internal/rootcoord/param_table.go
浏览文件 @
e6de86a4
...
...
@@ -40,6 +40,7 @@ type ParamTable struct {
TimeTickChannel
string
StatisticsChannel
string
DmlChannelNum
int64
MaxPartitionNum
int64
DefaultPartitionName
string
DefaultIndexName
string
...
...
@@ -72,6 +73,7 @@ func (p *ParamTable) Init() {
p
.
initTimeTickChannel
()
p
.
initStatisticsChannelName
()
p
.
initDmlChannelNum
()
p
.
initMaxPartitionNum
()
p
.
initMinSegmentSizeToEnableIndex
()
p
.
initDefaultPartitionName
()
...
...
@@ -161,6 +163,10 @@ func (p *ParamTable) initStatisticsChannelName() {
p
.
StatisticsChannel
=
channel
}
func
(
p
*
ParamTable
)
initDmlChannelNum
()
{
p
.
DmlChannelNum
=
p
.
ParseInt64
(
"rootcoord.dmlChannelNum"
)
}
func
(
p
*
ParamTable
)
initMaxPartitionNum
()
{
p
.
MaxPartitionNum
=
p
.
ParseInt64
(
"rootcoord.maxPartitionNum"
)
}
...
...
internal/rootcoord/root_coord.go
浏览文件 @
e6de86a4
...
...
@@ -495,7 +495,7 @@ func (c *Core) setMsgStreams() error {
CreateCollectionRequest
:
*
req
,
}
msgPack
.
Msgs
=
append
(
msgPack
.
Msgs
,
msg
)
return
c
.
dmlChannels
.
Broadcast
All
(
channelNames
,
&
msgPack
)
return
c
.
dmlChannels
.
Broadcast
(
channelNames
,
&
msgPack
)
}
c
.
SendDdDropCollectionReq
=
func
(
ctx
context
.
Context
,
req
*
internalpb
.
DropCollectionRequest
,
channelNames
[]
string
)
error
{
...
...
@@ -511,7 +511,7 @@ func (c *Core) setMsgStreams() error {
DropCollectionRequest
:
*
req
,
}
msgPack
.
Msgs
=
append
(
msgPack
.
Msgs
,
msg
)
return
c
.
dmlChannels
.
Broadcast
All
(
channelNames
,
&
msgPack
)
return
c
.
dmlChannels
.
Broadcast
(
channelNames
,
&
msgPack
)
}
c
.
SendDdCreatePartitionReq
=
func
(
ctx
context
.
Context
,
req
*
internalpb
.
CreatePartitionRequest
,
channelNames
[]
string
)
error
{
...
...
@@ -527,7 +527,7 @@ func (c *Core) setMsgStreams() error {
CreatePartitionRequest
:
*
req
,
}
msgPack
.
Msgs
=
append
(
msgPack
.
Msgs
,
msg
)
return
c
.
dmlChannels
.
Broadcast
All
(
channelNames
,
&
msgPack
)
return
c
.
dmlChannels
.
Broadcast
(
channelNames
,
&
msgPack
)
}
c
.
SendDdDropPartitionReq
=
func
(
ctx
context
.
Context
,
req
*
internalpb
.
DropPartitionRequest
,
channelNames
[]
string
)
error
{
...
...
@@ -543,7 +543,7 @@ func (c *Core) setMsgStreams() error {
DropPartitionRequest
:
*
req
,
}
msgPack
.
Msgs
=
append
(
msgPack
.
Msgs
,
msg
)
return
c
.
dmlChannels
.
Broadcast
All
(
channelNames
,
&
msgPack
)
return
c
.
dmlChannels
.
Broadcast
(
channelNames
,
&
msgPack
)
}
return
nil
...
...
@@ -959,7 +959,7 @@ func (c *Core) Init() error {
return
}
c
.
dmlChannels
=
newD
MLChannels
(
c
)
c
.
dmlChannels
=
newD
mlChannels
(
c
,
Params
.
DmlChannelNum
)
pc
:=
c
.
MetaTable
.
ListCollectionPhysicalChannels
()
c
.
dmlChannels
.
AddProducerChannels
(
pc
...
)
...
...
internal/rootcoord/root_coord_test.go
浏览文件 @
e6de86a4
...
...
@@ -410,7 +410,7 @@ func TestRootCoord(t *testing.T) {
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
commonpb
.
ErrorCode_Success
,
status
.
ErrorCode
)
assert
.
Equal
(
t
,
2
,
len
(
core
.
dmlChannels
.
dml
))
assert
.
Equal
(
t
,
2
,
core
.
dmlChannels
.
GetNumChannels
(
))
pChan
:=
core
.
MetaTable
.
ListCollectionPhysicalChannels
()
dmlStream
.
AsConsumer
([]
string
{
pChan
[
0
]},
Params
.
MsgChannelSubName
)
...
...
@@ -1431,7 +1431,10 @@ func TestRootCoord(t *testing.T) {
assert
.
Nil
(
t
,
err
)
time
.
Sleep
(
100
*
time
.
Millisecond
)
core
.
dmlChannels
.
AddProducerChannels
(
"c0"
,
"c1"
,
"c2"
)
cn0
:=
core
.
dmlChannels
.
GetDmlMsgStreamName
()
cn1
:=
core
.
dmlChannels
.
GetDmlMsgStreamName
()
cn2
:=
core
.
dmlChannels
.
GetDmlMsgStreamName
()
core
.
dmlChannels
.
AddProducerChannels
(
cn0
,
cn1
,
cn2
)
msg0
:=
&
internalpb
.
ChannelTimeTickMsg
{
Base
:
&
commonpb
.
MsgBase
{
...
...
internal/rootcoord/task.go
浏览文件 @
e6de86a4
...
...
@@ -136,7 +136,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
vchanNames
:=
make
([]
string
,
t
.
Req
.
ShardsNum
)
chanNames
:=
make
([]
string
,
t
.
Req
.
ShardsNum
)
for
i
:=
int32
(
0
);
i
<
t
.
Req
.
ShardsNum
;
i
++
{
vchanNames
[
i
]
=
fmt
.
Sprintf
(
"%s_%d
_%d_v"
,
t
.
Req
.
CollectionName
,
collID
,
i
)
vchanNames
[
i
]
=
fmt
.
Sprintf
(
"%s_%d
v"
,
t
.
core
.
dmlChannels
.
GetDmlMsgStreamName
(),
collID
)
chanNames
[
i
]
=
ToPhysicalChannel
(
vchanNames
[
i
])
}
...
...
@@ -286,10 +286,8 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
t
.
core
.
chanTimeTick
.
RemoveDdlTimeTick
(
ts
,
reason
)
t
.
core
.
SendTimeTick
(
ts
,
reason
)
for
_
,
chanName
:=
range
collMeta
.
PhysicalChannelNames
{
// send tt into deleted channels to tell data_node to clear flowgragh
t
.
core
.
chanTimeTick
.
SendChannelTimeTick
(
chanName
,
ts
)
}
// send tt into deleted channels to tell data_node to clear flowgragh
t
.
core
.
chanTimeTick
.
SendTimeTickToChannel
(
collMeta
.
PhysicalChannelNames
,
ts
)
// remove dml channel after send dd msg
t
.
core
.
dmlChannels
.
RemoveProducerChannels
(
collMeta
.
PhysicalChannelNames
...
)
...
...
internal/rootcoord/timeticksync.go
浏览文件 @
e6de86a4
...
...
@@ -22,16 +22,12 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
type
ddlTimetickInfo
struct
{
ddlMinTs
typeutil
.
Timestamp
ddlTsSet
map
[
typeutil
.
Timestamp
]
struct
{}
}
type
timetickSync
struct
{
core
*
Core
lock
sync
.
Mutex
...
...
@@ -191,7 +187,7 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
t
.
proxyTimeTick
[
in
.
Base
.
SourceID
]
=
newChannelTimeTickMsg
(
in
)
//log.Debug("update proxyTimeTick", zap.Int64("source id", in.Base.SourceID),
// zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason))
// zap.
Any("Ts", in.Timestamps), zap.
Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason))
t
.
sendToChannel
()
return
nil
...
...
@@ -225,34 +221,53 @@ func (t *timetickSync) StartWatch() {
for
{
select
{
case
<-
t
.
core
.
ctx
.
Done
()
:
log
.
Debug
(
"root
coord context done"
,
zap
.
Error
(
t
.
core
.
ctx
.
Err
()))
log
.
Debug
(
"rootcoord context done"
,
zap
.
Error
(
t
.
core
.
ctx
.
Err
()))
return
case
p
tt
,
ok
:=
<-
t
.
sendChan
:
case
p
roxyTimetick
,
ok
:=
<-
t
.
sendChan
:
if
!
ok
{
log
.
Debug
(
"timetickSync sendChan closed"
)
return
}
// reduce each channel to get min timestamp
mtt
:=
ptt
[
t
.
core
.
session
.
ServerID
]
for
_
,
chanName
:=
range
mtt
.
in
.
ChannelNames
{
mints
:=
mtt
.
getTimetick
(
chanName
)
for
_
,
tt
:=
range
ptt
{
ts
:=
tt
.
getTimetick
(
chanName
)
if
ts
<
mints
{
mints
=
ts
local
:=
proxyTimetick
[
t
.
core
.
session
.
ServerID
]
if
len
(
local
.
in
.
ChannelNames
)
==
0
{
continue
}
hdr
:=
fmt
.
Sprintf
(
"send ts to %d channels"
,
len
(
local
.
in
.
ChannelNames
))
tr
:=
timerecord
.
NewTimeRecorder
(
hdr
)
wg
:=
sync
.
WaitGroup
{}
for
_
,
chanName
:=
range
local
.
in
.
ChannelNames
{
wg
.
Add
(
1
)
go
func
(
chanName
string
)
{
mints
:=
local
.
getTimetick
(
chanName
)
for
_
,
tt
:=
range
proxyTimetick
{
ts
:=
tt
.
getTimetick
(
chanName
)
if
ts
<
mints
{
mints
=
ts
}
}
}
if
err
:=
t
.
SendChannelTimeTick
(
chanName
,
mints
);
err
!=
nil
{
log
.
Debug
(
"SendChannelTimeTick fail"
,
zap
.
Error
(
err
))
}
if
err
:=
t
.
SendTimeTickToChannel
([]
string
{
chanName
},
mints
);
err
!=
nil
{
log
.
Debug
(
"SendTimeTickToChannel fail"
,
zap
.
Error
(
err
))
}
wg
.
Done
()
}(
chanName
)
}
wg
.
Wait
()
span
:=
tr
.
ElapseSpan
()
// rootcoord send tt msg to all channels every 200ms by default
if
span
.
Milliseconds
()
>
200
{
log
.
Warn
(
"rootcoord send tt to all channels too slowly"
,
zap
.
Int
(
"chanNum"
,
len
(
local
.
in
.
ChannelNames
)),
zap
.
Int64
(
"span"
,
span
.
Milliseconds
()))
}
}
}
}
// Send
ChannelTimeTick
send each channel's min timetick to msg stream
func
(
t
*
timetickSync
)
Send
ChannelTimeTick
(
chanName
string
,
ts
typeutil
.
Timestamp
)
error
{
// Send
TimeTickToChannel
send each channel's min timetick to msg stream
func
(
t
*
timetickSync
)
Send
TimeTickToChannel
(
chanNames
[]
string
,
ts
typeutil
.
Timestamp
)
error
{
msgPack
:=
msgstream
.
MsgPack
{}
baseMsg
:=
msgstream
.
BaseMsg
{
BeginTimestamp
:
ts
,
...
...
@@ -273,11 +288,14 @@ func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestam
}
msgPack
.
Msgs
=
append
(
msgPack
.
Msgs
,
timeTickMsg
)
err
:=
t
.
core
.
dmlChannels
.
Broadcast
(
chanName
,
&
msgPack
)
if
err
==
nil
{
if
err
:=
t
.
core
.
dmlChannels
.
Broadcast
(
chanNames
,
&
msgPack
);
err
!=
nil
{
return
err
}
for
_
,
chanName
:=
range
chanNames
{
metrics
.
RootCoordInsertChannelTimeTick
.
WithLabelValues
(
chanName
)
.
Set
(
float64
(
tsoutil
.
Mod24H
(
ts
)))
}
return
err
return
nil
}
// GetProxyNum return the num of detected proxy node
...
...
internal/util/timerecord/time_recorder.go
浏览文件 @
e6de86a4
...
...
@@ -33,19 +33,30 @@ func NewTimeRecorder(header string) *TimeRecorder {
}
}
// Record calculates the time span from previous Record call
func
(
tr
*
TimeRecorder
)
Record
(
msg
string
)
time
.
Duration
{
func
(
tr
*
TimeRecorder
)
RecordSpan
()
time
.
Duration
{
curr
:=
time
.
Now
()
span
:=
curr
.
Sub
(
tr
.
last
)
tr
.
last
=
curr
return
span
}
func
(
tr
*
TimeRecorder
)
ElapseSpan
()
time
.
Duration
{
curr
:=
time
.
Now
()
span
:=
curr
.
Sub
(
tr
.
start
)
tr
.
last
=
curr
return
span
}
// Record calculates the time span from previous Record call
func
(
tr
*
TimeRecorder
)
Record
(
msg
string
)
time
.
Duration
{
span
:=
tr
.
RecordSpan
()
tr
.
printTimeRecord
(
msg
,
span
)
return
span
}
// Elapse calculates the time span from the beginning of this TimeRecorder
func
(
tr
*
TimeRecorder
)
Elapse
(
msg
string
)
time
.
Duration
{
curr
:=
time
.
Now
()
span
:=
curr
.
Sub
(
tr
.
start
)
span
:=
tr
.
ElapseSpan
()
tr
.
printTimeRecord
(
msg
,
span
)
return
span
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录