Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
57ba2361
M
milvus
项目概览
milvus
/
milvus
大约 1 年 前同步成功
通知
261
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
57ba2361
编写于
2月 05, 2021
作者:
X
xige-16
提交者:
yefu.chen
2月 05, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add mutex for message stream’s asConsumer
Signed-off-by:
N
xige-16
<
xi.ge@zilliz.com
>
上级
7e1887da
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
133 addition
and
79 deletion
+133
-79
internal/msgstream/pulsarms/pulsar_msgstream.go
internal/msgstream/pulsarms/pulsar_msgstream.go
+132
-78
internal/msgstream/pulsarms/pulsar_msgstream_test.go
internal/msgstream/pulsarms/pulsar_msgstream_test.go
+1
-1
未找到文件。
internal/msgstream/pulsarms/pulsar_msgstream.go
浏览文件 @
57ba2361
...
...
@@ -32,6 +32,7 @@ type QueryNodeStatsMsg = msgstream.QueryNodeStatsMsg
type
RepackFunc
=
msgstream
.
RepackFunc
type
Consumer
=
pulsar
.
Consumer
type
Producer
=
pulsar
.
Producer
type
MessageID
=
pulsar
.
MessageID
type
UnmarshalDispatcher
=
msgstream
.
UnmarshalDispatcher
type
PulsarMsgStream
struct
{
...
...
@@ -46,6 +47,8 @@ type PulsarMsgStream struct {
wait
*
sync
.
WaitGroup
streamCancel
func
()
pulsarBufSize
int64
consumerLock
*
sync
.
Mutex
consumerReflects
[]
reflect
.
SelectCase
}
func
newPulsarMsgStream
(
ctx
context
.
Context
,
...
...
@@ -58,22 +61,30 @@ func newPulsarMsgStream(ctx context.Context,
producers
:=
make
([]
Producer
,
0
)
consumers
:=
make
([]
Consumer
,
0
)
consumerChannels
:=
make
([]
string
,
0
)
consumerReflects
:=
make
([]
reflect
.
SelectCase
,
0
)
receiveBuf
:=
make
(
chan
*
MsgPack
,
receiveBufSize
)
client
,
err
:=
pulsar
.
NewClient
(
pulsar
.
ClientOptions
{
URL
:
address
})
if
err
!=
nil
{
defer
streamCancel
()
log
.
Printf
(
"Set pulsar client failed, error = %v"
,
err
)
return
nil
,
err
}
stream
:=
&
PulsarMsgStream
{
ctx
:
streamCtx
,
streamCancel
:
streamCancel
,
client
:
client
,
producers
:
producers
,
consumers
:
consumers
,
consumerChannels
:
consumerChannels
,
unmarshal
:
unmarshal
,
pulsarBufSize
:
pulsarBufSize
,
receiveBuf
:
receiveBuf
,
streamCancel
:
streamCancel
,
consumerReflects
:
consumerReflects
,
consumerLock
:
&
sync
.
Mutex
{},
}
client
,
err
:=
pulsar
.
NewClient
(
pulsar
.
ClientOptions
{
URL
:
address
})
if
err
!=
nil
{
log
.
Printf
(
"Set pulsar client failed, error = %v"
,
err
)
return
nil
,
err
}
stream
.
client
=
client
stream
.
receiveBuf
=
make
(
chan
*
MsgPack
,
receiveBufSize
)
return
stream
,
nil
}
...
...
@@ -118,7 +129,14 @@ func (ms *PulsarMsgStream) AsConsumer(channels []string,
return
errors
.
New
(
"pulsar is not ready, consumer is nil"
)
}
ms
.
consumerLock
.
Lock
()
ms
.
consumers
=
append
(
ms
.
consumers
,
pc
)
ms
.
consumerChannels
=
append
(
ms
.
consumerChannels
,
channels
[
i
])
ms
.
consumerReflects
=
append
(
ms
.
consumerReflects
,
reflect
.
SelectCase
{
Dir
:
reflect
.
SelectRecv
,
Chan
:
reflect
.
ValueOf
(
pc
.
Chan
()),
})
ms
.
consumerLock
.
Unlock
()
return
nil
}
err
:=
util
.
Retry
(
10
,
time
.
Millisecond
*
200
,
fn
)
...
...
@@ -299,12 +317,6 @@ func (ms *PulsarMsgStream) Consume() *MsgPack {
func
(
ms
*
PulsarMsgStream
)
bufMsgPackToChannel
()
{
defer
ms
.
wait
.
Done
()
cases
:=
make
([]
reflect
.
SelectCase
,
len
(
ms
.
consumers
))
for
i
:=
0
;
i
<
len
(
ms
.
consumers
);
i
++
{
ch
:=
ms
.
consumers
[
i
]
.
Chan
()
cases
[
i
]
=
reflect
.
SelectCase
{
Dir
:
reflect
.
SelectRecv
,
Chan
:
reflect
.
ValueOf
(
ch
)}
}
for
{
select
{
case
<-
ms
.
ctx
.
Done
()
:
...
...
@@ -314,7 +326,9 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() {
tsMsgList
:=
make
([]
TsMsg
,
0
)
for
{
chosen
,
value
,
ok
:=
reflect
.
Select
(
cases
)
ms
.
consumerLock
.
Lock
()
chosen
,
value
,
ok
:=
reflect
.
Select
(
ms
.
consumerReflects
)
ms
.
consumerLock
.
Unlock
()
if
!
ok
{
log
.
Printf
(
"channel closed"
)
return
...
...
@@ -339,6 +353,11 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() {
log
.
Printf
(
"Failed to unmarshal tsMsg, error = %v"
,
err
)
continue
}
tsMsg
.
SetPosition
(
&
msgstream
.
MsgPosition
{
ChannelName
:
filepath
.
Base
(
pulsarMsg
.
Topic
()),
MsgID
:
typeutil
.
PulsarMsgIDToString
(
pulsarMsg
.
ID
()),
})
tsMsgList
=
append
(
tsMsgList
,
tsMsg
)
noMoreMessage
:=
true
...
...
@@ -386,6 +405,7 @@ func (ms *PulsarMsgStream) Seek(mp *internalpb2.MsgPosition) error {
type
PulsarTtMsgStream
struct
{
PulsarMsgStream
unsolvedBuf
map
[
Consumer
][]
TsMsg
unsolvedMutex
*
sync
.
Mutex
lastTimeStamp
Timestamp
}
...
...
@@ -394,27 +414,53 @@ func NewPulsarTtMsgStream(ctx context.Context,
receiveBufSize
int64
,
pulsarBufSize
int64
,
unmarshal
msgstream
.
UnmarshalDispatcher
)
(
*
PulsarTtMsgStream
,
error
)
{
streamCtx
,
streamCancel
:=
context
.
WithCancel
(
ctx
)
pulsarMsgStream
:=
PulsarMsgStream
{
ctx
:
streamCtx
,
streamCancel
:
streamCancel
,
pulsarBufSize
:
pulsarBufSize
,
unmarshal
:
unmarshal
,
}
client
,
err
:=
pulsar
.
NewClient
(
pulsar
.
ClientOptions
{
URL
:
address
})
pulsarMsgStream
,
err
:=
newPulsarMsgStream
(
ctx
,
address
,
receiveBufSize
,
pulsarBufSize
,
unmarshal
)
if
err
!=
nil
{
log
.
Printf
(
"Set pulsar client failed, error = %v"
,
err
)
return
nil
,
err
}
pulsarMsgStream
.
client
=
client
pulsarMsgStream
.
receiveBuf
=
make
(
chan
*
MsgPack
,
receiveBufSize
)
unsolvedBuf
:=
make
(
map
[
Consumer
][]
TsMsg
)
return
&
PulsarTtMsgStream
{
PulsarMsgStream
:
pulsarMsgStream
,
PulsarMsgStream
:
*
pulsarMsgStream
,
unsolvedBuf
:
unsolvedBuf
,
unsolvedMutex
:
&
sync
.
Mutex
{},
},
nil
}
func
(
ms
*
PulsarTtMsgStream
)
AsConsumer
(
channels
[]
string
,
subName
string
)
{
for
i
:=
0
;
i
<
len
(
channels
);
i
++
{
fn
:=
func
()
error
{
receiveChannel
:=
make
(
chan
pulsar
.
ConsumerMessage
,
ms
.
pulsarBufSize
)
pc
,
err
:=
ms
.
client
.
Subscribe
(
pulsar
.
ConsumerOptions
{
Topic
:
channels
[
i
],
SubscriptionName
:
subName
,
Type
:
pulsar
.
KeyShared
,
SubscriptionInitialPosition
:
pulsar
.
SubscriptionPositionEarliest
,
MessageChannel
:
receiveChannel
,
})
if
err
!=
nil
{
return
err
}
if
pc
==
nil
{
return
errors
.
New
(
"pulsar is not ready, consumer is nil"
)
}
ms
.
consumerLock
.
Lock
()
ms
.
consumers
=
append
(
ms
.
consumers
,
pc
)
ms
.
unsolvedBuf
[
pc
]
=
make
([]
TsMsg
,
0
)
ms
.
consumerChannels
=
append
(
ms
.
consumerChannels
,
channels
[
i
])
ms
.
consumerLock
.
Unlock
()
return
nil
}
err
:=
util
.
Retry
(
10
,
time
.
Millisecond
*
200
,
fn
)
if
err
!=
nil
{
errMsg
:=
"Failed to create consumer "
+
channels
[
i
]
+
", error = "
+
err
.
Error
()
panic
(
errMsg
)
}
}
}
func
(
ms
*
PulsarTtMsgStream
)
Start
()
{
ms
.
wait
=
&
sync
.
WaitGroup
{}
if
ms
.
consumers
!=
nil
{
...
...
@@ -428,33 +474,32 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
ms
.
unsolvedBuf
=
make
(
map
[
Consumer
][]
TsMsg
)
isChannelReady
:=
make
(
map
[
Consumer
]
bool
)
eofMsgTimeStamp
:=
make
(
map
[
Consumer
]
Timestamp
)
for
_
,
consumer
:=
range
ms
.
consumers
{
ms
.
unsolvedBuf
[
consumer
]
=
make
([]
TsMsg
,
0
)
}
for
{
select
{
case
<-
ms
.
ctx
.
Done
()
:
return
default
:
wg
:=
sync
.
WaitGroup
{}
mu
:=
sync
.
Mutex
{}
findMapMutex
:=
sync
.
RWMutex
{}
ms
.
consumerLock
.
Lock
()
for
_
,
consumer
:=
range
ms
.
consumers
{
if
isChannelReady
[
consumer
]
{
continue
}
wg
.
Add
(
1
)
go
ms
.
findTimeTick
(
consumer
,
eofMsgTimeStamp
,
&
wg
,
&
mu
,
&
findMapMutex
)
go
ms
.
findTimeTick
(
consumer
,
eofMsgTimeStamp
,
&
wg
,
&
findMapMutex
)
}
wg
.
Wait
()
timeStamp
,
ok
:=
checkTimeTickMsg
(
eofMsgTimeStamp
,
isChannelReady
,
&
findMapMutex
)
ms
.
consumerLock
.
Unlock
()
if
!
ok
||
timeStamp
<=
ms
.
lastTimeStamp
{
//log.Printf("All timeTick's timestamps are inconsistent")
continue
}
timeTickBuf
:=
make
([]
TsMsg
,
0
)
msgPositions
:=
make
([]
*
internalpb2
.
MsgPosition
,
0
)
ms
.
unsolvedMutex
.
Lock
()
for
consumer
,
msgs
:=
range
ms
.
unsolvedBuf
{
tempBuffer
:=
make
([]
TsMsg
,
0
)
var
timeTickMsg
TsMsg
...
...
@@ -485,6 +530,7 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
})
}
}
ms
.
unsolvedMutex
.
Unlock
()
msgPack
:=
MsgPack
{
BeginTs
:
ms
.
lastTimeStamp
,
...
...
@@ -502,7 +548,6 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
func
(
ms
*
PulsarTtMsgStream
)
findTimeTick
(
consumer
Consumer
,
eofMsgMap
map
[
Consumer
]
Timestamp
,
wg
*
sync
.
WaitGroup
,
mu
*
sync
.
Mutex
,
findMapMutex
*
sync
.
RWMutex
)
{
defer
wg
.
Done
()
for
{
...
...
@@ -519,14 +564,13 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer,
headerMsg
:=
commonpb
.
MsgHeader
{}
err
:=
proto
.
Unmarshal
(
pulsarMsg
.
Payload
(),
&
headerMsg
)
if
err
!=
nil
{
log
.
Printf
(
"Failed to unmarshal, error = %v"
,
err
)
log
.
Printf
(
"Failed to unmarshal message header, error = %v"
,
err
)
continue
}
tsMsg
,
err
:=
ms
.
unmarshal
.
Unmarshal
(
pulsarMsg
.
Payload
(),
headerMsg
.
Base
.
MsgType
)
if
tsMsg
==
nil
&&
err
!=
nil
{
panic
(
"null unMarshalFunc for "
+
headerMsg
.
Base
.
MsgType
.
String
()
+
" msg type"
)
}
if
err
!=
nil
{
log
.
Printf
(
"Failed to unmarshal, error = %v"
,
err
)
log
.
Printf
(
"Failed to unmarshal tsMsg, error = %v"
,
err
)
continue
}
// set pulsar info to tsMsg
tsMsg
.
SetPosition
(
&
msgstream
.
MsgPosition
{
...
...
@@ -534,9 +578,9 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer,
MsgID
:
typeutil
.
PulsarMsgIDToString
(
pulsarMsg
.
ID
()),
})
m
u
.
Lock
()
m
s
.
unsolvedMutex
.
Lock
()
ms
.
unsolvedBuf
[
consumer
]
=
append
(
ms
.
unsolvedBuf
[
consumer
],
tsMsg
)
m
u
.
Unlock
()
m
s
.
unsolvedMutex
.
Unlock
()
if
headerMsg
.
Base
.
MsgType
==
commonpb
.
MsgType_kTimeTick
{
findMapMutex
.
Lock
()
...
...
@@ -549,50 +593,60 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer,
}
func
(
ms
*
PulsarTtMsgStream
)
Seek
(
mp
*
internalpb2
.
MsgPosition
)
error
{
var
consumer
Consumer
var
messageID
MessageID
for
index
,
channel
:=
range
ms
.
consumerChannels
{
if
filepath
.
Base
(
channel
)
==
filepath
.
Base
(
mp
.
ChannelName
)
{
messageID
,
err
:=
typeutil
.
StringToPulsarMsgID
(
mp
.
MsgID
)
if
err
!=
nil
{
return
err
}
consumer
:=
ms
.
consumers
[
index
]
err
=
(
consumer
)
.
Seek
(
messageID
)
seekMsgID
,
err
:=
typeutil
.
StringToPulsarMsgID
(
mp
.
MsgID
)
if
err
!=
nil
{
return
err
}
consumer
=
ms
.
consumers
[
index
]
messageID
=
seekMsgID
break
}
}
for
{
select
{
case
<-
ms
.
ctx
.
Done
()
:
return
nil
case
pulsarMsg
,
ok
:=
<-
consumer
.
Chan
()
:
if
!
ok
{
return
errors
.
New
(
"consumer closed"
)
}
consumer
.
Ack
(
pulsarMsg
)
if
consumer
!=
nil
{
err
:=
(
consumer
)
.
Seek
(
messageID
)
if
err
!=
nil
{
return
err
}
headerMsg
:=
commonpb
.
MsgHeader
{}
err
:=
proto
.
Unmarshal
(
pulsarMsg
.
Payload
(),
&
headerMsg
)
if
err
!=
nil
{
log
.
Printf
(
"Failed to unmarshal msgHeader, error = %v"
,
err
)
}
tsMsg
,
err
:=
ms
.
unmarshal
.
Unmarshal
(
pulsarMsg
.
Payload
(),
headerMsg
.
Base
.
MsgType
)
if
tsMsg
==
nil
&&
err
!=
nil
{
panic
(
"null unMarshalFunc for "
+
headerMsg
.
Base
.
MsgType
.
String
()
+
" msg type"
)
ms
.
unsolvedMutex
.
Lock
()
ms
.
unsolvedBuf
[
consumer
]
=
make
([]
TsMsg
,
0
)
for
{
select
{
case
<-
ms
.
ctx
.
Done
()
:
return
nil
case
pulsarMsg
,
ok
:=
<-
consumer
.
Chan
()
:
if
!
ok
{
return
errors
.
New
(
"consumer closed"
)
}
consumer
.
Ack
(
pulsarMsg
)
headerMsg
:=
commonpb
.
MsgHeader
{}
err
:=
proto
.
Unmarshal
(
pulsarMsg
.
Payload
(),
&
headerMsg
)
if
err
!=
nil
{
log
.
Printf
(
"Failed to unmarshal message header, error = %v"
,
err
)
}
tsMsg
,
err
:=
ms
.
unmarshal
.
Unmarshal
(
pulsarMsg
.
Payload
(),
headerMsg
.
Base
.
MsgType
)
if
err
!=
nil
{
log
.
Printf
(
"Failed to unmarshal tsMsg, error = %v"
,
err
)
}
if
tsMsg
.
Type
()
==
commonpb
.
MsgType_kTimeTick
{
if
tsMsg
.
BeginTs
()
>=
mp
.
Timestamp
{
ms
.
unsolvedMutex
.
Unlock
()
return
nil
}
if
err
!=
nil
{
log
.
Printf
(
"Failed to unmarshal pulsarMsg, error = %v"
,
err
)
}
if
tsMsg
.
Type
()
==
commonpb
.
MsgType_kTimeTick
{
if
tsMsg
.
BeginTs
()
>=
mp
.
Timestamp
{
return
nil
}
continue
}
if
tsMsg
.
BeginTs
()
>
mp
.
Timestamp
{
ms
.
unsolvedBuf
[
consumer
]
=
append
(
ms
.
unsolvedBuf
[
consumer
],
tsMsg
)
}
continue
}
if
tsMsg
.
BeginTs
()
>
mp
.
Timestamp
{
tsMsg
.
SetPosition
(
&
msgstream
.
MsgPosition
{
ChannelName
:
filepath
.
Base
(
pulsarMsg
.
Topic
()),
MsgID
:
typeutil
.
PulsarMsgIDToString
(
pulsarMsg
.
ID
()),
})
ms
.
unsolvedBuf
[
consumer
]
=
append
(
ms
.
unsolvedBuf
[
consumer
],
tsMsg
)
}
}
}
...
...
internal/msgstream/pulsarms/pulsar_msgstream_test.go
浏览文件 @
57ba2361
...
...
@@ -568,7 +568,7 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) {
msgPack3
.
Msgs
=
append
(
msgPack3
.
Msgs
,
getTsMsg
(
commonpb
.
MsgType_kInsert
,
9
,
9
))
msgPack4
:=
MsgPack
{}
msgPack4
.
Msgs
=
append
(
msgPack
2
.
Msgs
,
getTimeTickMsg
(
11
,
11
,
11
))
msgPack4
.
Msgs
=
append
(
msgPack
4
.
Msgs
,
getTimeTickMsg
(
11
,
11
,
11
))
msgPack5
:=
MsgPack
{}
msgPack5
.
Msgs
=
append
(
msgPack5
.
Msgs
,
getTimeTickMsg
(
15
,
15
,
15
))
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录