Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
c231f6da
M
milvus
项目概览
milvus
/
milvus
11 个月 前同步成功
通知
261
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
c231f6da
编写于
2月 05, 2021
作者:
G
godchen
提交者:
yefu.chen
2月 05, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Fix collectionName not found error
Signed-off-by:
N
godchen
<
qingxiang.chen@zilliz.com
>
上级
07167f41
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
87 addition
and
135 deletion
+87
-135
internal/masterservice/task.go
internal/masterservice/task.go
+7
-1
internal/msgstream/pulsarms/pulsar_msgstream.go
internal/msgstream/pulsarms/pulsar_msgstream.go
+78
-132
internal/msgstream/pulsarms/pulsar_msgstream_test.go
internal/msgstream/pulsarms/pulsar_msgstream_test.go
+1
-1
tests/python/requirements.txt
tests/python/requirements.txt
+1
-1
未找到文件。
internal/masterservice/task.go
浏览文件 @
c231f6da
...
...
@@ -457,7 +457,13 @@ func (t *ShowPartitionReqTask) IgnoreTimeStamp() bool {
}
func
(
t
*
ShowPartitionReqTask
)
Execute
()
error
{
coll
,
err
:=
t
.
core
.
MetaTable
.
GetCollectionByName
(
t
.
Req
.
CollectionName
)
var
coll
*
etcdpb
.
CollectionInfo
var
err
error
if
t
.
Req
.
CollectionName
==
""
{
coll
,
err
=
t
.
core
.
MetaTable
.
GetCollectionByID
(
t
.
Req
.
CollectionID
)
}
else
{
coll
,
err
=
t
.
core
.
MetaTable
.
GetCollectionByName
(
t
.
Req
.
CollectionName
)
}
if
err
!=
nil
{
return
err
}
...
...
internal/msgstream/pulsarms/pulsar_msgstream.go
浏览文件 @
c231f6da
...
...
@@ -32,7 +32,6 @@ type QueryNodeStatsMsg = msgstream.QueryNodeStatsMsg
type
RepackFunc
=
msgstream
.
RepackFunc
type
Consumer
=
pulsar
.
Consumer
type
Producer
=
pulsar
.
Producer
type
MessageID
=
pulsar
.
MessageID
type
UnmarshalDispatcher
=
msgstream
.
UnmarshalDispatcher
type
PulsarMsgStream
struct
{
...
...
@@ -47,8 +46,6 @@ type PulsarMsgStream struct {
wait
*
sync
.
WaitGroup
streamCancel
func
()
pulsarBufSize
int64
consumerLock
*
sync
.
Mutex
consumerReflects
[]
reflect
.
SelectCase
}
func
newPulsarMsgStream
(
ctx
context
.
Context
,
...
...
@@ -61,30 +58,22 @@ func newPulsarMsgStream(ctx context.Context,
producers
:=
make
([]
Producer
,
0
)
consumers
:=
make
([]
Consumer
,
0
)
consumerChannels
:=
make
([]
string
,
0
)
consumerReflects
:=
make
([]
reflect
.
SelectCase
,
0
)
receiveBuf
:=
make
(
chan
*
MsgPack
,
receiveBufSize
)
client
,
err
:=
pulsar
.
NewClient
(
pulsar
.
ClientOptions
{
URL
:
address
})
if
err
!=
nil
{
defer
streamCancel
()
log
.
Printf
(
"Set pulsar client failed, error = %v"
,
err
)
return
nil
,
err
}
stream
:=
&
PulsarMsgStream
{
ctx
:
streamCtx
,
client
:
client
,
streamCancel
:
streamCancel
,
producers
:
producers
,
consumers
:
consumers
,
consumerChannels
:
consumerChannels
,
unmarshal
:
unmarshal
,
pulsarBufSize
:
pulsarBufSize
,
receiveBuf
:
receiveBuf
,
streamCancel
:
streamCancel
,
consumerReflects
:
consumerReflects
,
consumerLock
:
&
sync
.
Mutex
{},
}
client
,
err
:=
pulsar
.
NewClient
(
pulsar
.
ClientOptions
{
URL
:
address
})
if
err
!=
nil
{
log
.
Printf
(
"Set pulsar client failed, error = %v"
,
err
)
return
nil
,
err
}
stream
.
client
=
client
stream
.
receiveBuf
=
make
(
chan
*
MsgPack
,
receiveBufSize
)
return
stream
,
nil
}
...
...
@@ -129,14 +118,7 @@ func (ms *PulsarMsgStream) AsConsumer(channels []string,
return
errors
.
New
(
"pulsar is not ready, consumer is nil"
)
}
ms
.
consumerLock
.
Lock
()
ms
.
consumers
=
append
(
ms
.
consumers
,
pc
)
ms
.
consumerChannels
=
append
(
ms
.
consumerChannels
,
channels
[
i
])
ms
.
consumerReflects
=
append
(
ms
.
consumerReflects
,
reflect
.
SelectCase
{
Dir
:
reflect
.
SelectRecv
,
Chan
:
reflect
.
ValueOf
(
pc
.
Chan
()),
})
ms
.
consumerLock
.
Unlock
()
return
nil
}
err
:=
util
.
Retry
(
10
,
time
.
Millisecond
*
200
,
fn
)
...
...
@@ -317,6 +299,12 @@ func (ms *PulsarMsgStream) Consume() *MsgPack {
func
(
ms
*
PulsarMsgStream
)
bufMsgPackToChannel
()
{
defer
ms
.
wait
.
Done
()
cases
:=
make
([]
reflect
.
SelectCase
,
len
(
ms
.
consumers
))
for
i
:=
0
;
i
<
len
(
ms
.
consumers
);
i
++
{
ch
:=
ms
.
consumers
[
i
]
.
Chan
()
cases
[
i
]
=
reflect
.
SelectCase
{
Dir
:
reflect
.
SelectRecv
,
Chan
:
reflect
.
ValueOf
(
ch
)}
}
for
{
select
{
case
<-
ms
.
ctx
.
Done
()
:
...
...
@@ -326,9 +314,7 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() {
tsMsgList
:=
make
([]
TsMsg
,
0
)
for
{
ms
.
consumerLock
.
Lock
()
chosen
,
value
,
ok
:=
reflect
.
Select
(
ms
.
consumerReflects
)
ms
.
consumerLock
.
Unlock
()
chosen
,
value
,
ok
:=
reflect
.
Select
(
cases
)
if
!
ok
{
log
.
Printf
(
"channel closed"
)
return
...
...
@@ -353,11 +339,6 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() {
log
.
Printf
(
"Failed to unmarshal tsMsg, error = %v"
,
err
)
continue
}
tsMsg
.
SetPosition
(
&
msgstream
.
MsgPosition
{
ChannelName
:
filepath
.
Base
(
pulsarMsg
.
Topic
()),
MsgID
:
typeutil
.
PulsarMsgIDToString
(
pulsarMsg
.
ID
()),
})
tsMsgList
=
append
(
tsMsgList
,
tsMsg
)
noMoreMessage
:=
true
...
...
@@ -405,7 +386,6 @@ func (ms *PulsarMsgStream) Seek(mp *internalpb2.MsgPosition) error {
type
PulsarTtMsgStream
struct
{
PulsarMsgStream
unsolvedBuf
map
[
Consumer
][]
TsMsg
unsolvedMutex
*
sync
.
Mutex
lastTimeStamp
Timestamp
}
...
...
@@ -414,53 +394,27 @@ func NewPulsarTtMsgStream(ctx context.Context,
receiveBufSize
int64
,
pulsarBufSize
int64
,
unmarshal
msgstream
.
UnmarshalDispatcher
)
(
*
PulsarTtMsgStream
,
error
)
{
pulsarMsgStream
,
err
:=
newPulsarMsgStream
(
ctx
,
address
,
receiveBufSize
,
pulsarBufSize
,
unmarshal
)
streamCtx
,
streamCancel
:=
context
.
WithCancel
(
ctx
)
pulsarMsgStream
:=
PulsarMsgStream
{
ctx
:
streamCtx
,
streamCancel
:
streamCancel
,
pulsarBufSize
:
pulsarBufSize
,
unmarshal
:
unmarshal
,
}
client
,
err
:=
pulsar
.
NewClient
(
pulsar
.
ClientOptions
{
URL
:
address
})
if
err
!=
nil
{
log
.
Printf
(
"Set pulsar client failed, error = %v"
,
err
)
return
nil
,
err
}
unsolvedBuf
:=
make
(
map
[
Consumer
][]
TsMsg
)
pulsarMsgStream
.
client
=
client
pulsarMsgStream
.
receiveBuf
=
make
(
chan
*
MsgPack
,
receiveBufSize
)
return
&
PulsarTtMsgStream
{
PulsarMsgStream
:
*
pulsarMsgStream
,
unsolvedBuf
:
unsolvedBuf
,
unsolvedMutex
:
&
sync
.
Mutex
{},
PulsarMsgStream
:
pulsarMsgStream
,
},
nil
}
func
(
ms
*
PulsarTtMsgStream
)
AsConsumer
(
channels
[]
string
,
subName
string
)
{
for
i
:=
0
;
i
<
len
(
channels
);
i
++
{
fn
:=
func
()
error
{
receiveChannel
:=
make
(
chan
pulsar
.
ConsumerMessage
,
ms
.
pulsarBufSize
)
pc
,
err
:=
ms
.
client
.
Subscribe
(
pulsar
.
ConsumerOptions
{
Topic
:
channels
[
i
],
SubscriptionName
:
subName
,
Type
:
pulsar
.
KeyShared
,
SubscriptionInitialPosition
:
pulsar
.
SubscriptionPositionEarliest
,
MessageChannel
:
receiveChannel
,
})
if
err
!=
nil
{
return
err
}
if
pc
==
nil
{
return
errors
.
New
(
"pulsar is not ready, consumer is nil"
)
}
ms
.
consumerLock
.
Lock
()
ms
.
consumers
=
append
(
ms
.
consumers
,
pc
)
ms
.
unsolvedBuf
[
pc
]
=
make
([]
TsMsg
,
0
)
ms
.
consumerChannels
=
append
(
ms
.
consumerChannels
,
channels
[
i
])
ms
.
consumerLock
.
Unlock
()
return
nil
}
err
:=
util
.
Retry
(
10
,
time
.
Millisecond
*
200
,
fn
)
if
err
!=
nil
{
errMsg
:=
"Failed to create consumer "
+
channels
[
i
]
+
", error = "
+
err
.
Error
()
panic
(
errMsg
)
}
}
}
func
(
ms
*
PulsarTtMsgStream
)
Start
()
{
ms
.
wait
=
&
sync
.
WaitGroup
{}
if
ms
.
consumers
!=
nil
{
...
...
@@ -474,32 +428,33 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
ms
.
unsolvedBuf
=
make
(
map
[
Consumer
][]
TsMsg
)
isChannelReady
:=
make
(
map
[
Consumer
]
bool
)
eofMsgTimeStamp
:=
make
(
map
[
Consumer
]
Timestamp
)
for
_
,
consumer
:=
range
ms
.
consumers
{
ms
.
unsolvedBuf
[
consumer
]
=
make
([]
TsMsg
,
0
)
}
for
{
select
{
case
<-
ms
.
ctx
.
Done
()
:
return
default
:
wg
:=
sync
.
WaitGroup
{}
mu
:=
sync
.
Mutex
{}
findMapMutex
:=
sync
.
RWMutex
{}
ms
.
consumerLock
.
Lock
()
for
_
,
consumer
:=
range
ms
.
consumers
{
if
isChannelReady
[
consumer
]
{
continue
}
wg
.
Add
(
1
)
go
ms
.
findTimeTick
(
consumer
,
eofMsgTimeStamp
,
&
wg
,
&
findMapMutex
)
go
ms
.
findTimeTick
(
consumer
,
eofMsgTimeStamp
,
&
wg
,
&
mu
,
&
findMapMutex
)
}
wg
.
Wait
()
timeStamp
,
ok
:=
checkTimeTickMsg
(
eofMsgTimeStamp
,
isChannelReady
,
&
findMapMutex
)
ms
.
consumerLock
.
Unlock
()
if
!
ok
||
timeStamp
<=
ms
.
lastTimeStamp
{
//log.Printf("All timeTick's timestamps are inconsistent")
continue
}
timeTickBuf
:=
make
([]
TsMsg
,
0
)
msgPositions
:=
make
([]
*
internalpb2
.
MsgPosition
,
0
)
ms
.
unsolvedMutex
.
Lock
()
for
consumer
,
msgs
:=
range
ms
.
unsolvedBuf
{
tempBuffer
:=
make
([]
TsMsg
,
0
)
var
timeTickMsg
TsMsg
...
...
@@ -530,7 +485,6 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
})
}
}
ms
.
unsolvedMutex
.
Unlock
()
msgPack
:=
MsgPack
{
BeginTs
:
ms
.
lastTimeStamp
,
...
...
@@ -548,6 +502,7 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
func
(
ms
*
PulsarTtMsgStream
)
findTimeTick
(
consumer
Consumer
,
eofMsgMap
map
[
Consumer
]
Timestamp
,
wg
*
sync
.
WaitGroup
,
mu
*
sync
.
Mutex
,
findMapMutex
*
sync
.
RWMutex
)
{
defer
wg
.
Done
()
for
{
...
...
@@ -564,13 +519,14 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer,
headerMsg
:=
commonpb
.
MsgHeader
{}
err
:=
proto
.
Unmarshal
(
pulsarMsg
.
Payload
(),
&
headerMsg
)
if
err
!=
nil
{
log
.
Printf
(
"Failed to unmarshal message header, error = %v"
,
err
)
continue
log
.
Printf
(
"Failed to unmarshal, error = %v"
,
err
)
}
tsMsg
,
err
:=
ms
.
unmarshal
.
Unmarshal
(
pulsarMsg
.
Payload
(),
headerMsg
.
Base
.
MsgType
)
if
tsMsg
==
nil
&&
err
!=
nil
{
panic
(
"null unMarshalFunc for "
+
headerMsg
.
Base
.
MsgType
.
String
()
+
" msg type"
)
}
if
err
!=
nil
{
log
.
Printf
(
"Failed to unmarshal tsMsg, error = %v"
,
err
)
continue
log
.
Printf
(
"Failed to unmarshal, error = %v"
,
err
)
}
// set pulsar info to tsMsg
tsMsg
.
SetPosition
(
&
msgstream
.
MsgPosition
{
...
...
@@ -578,9 +534,9 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer,
MsgID
:
typeutil
.
PulsarMsgIDToString
(
pulsarMsg
.
ID
()),
})
m
s
.
unsolvedMutex
.
Lock
()
m
u
.
Lock
()
ms
.
unsolvedBuf
[
consumer
]
=
append
(
ms
.
unsolvedBuf
[
consumer
],
tsMsg
)
m
s
.
unsolvedMutex
.
Unlock
()
m
u
.
Unlock
()
if
headerMsg
.
Base
.
MsgType
==
commonpb
.
MsgType_kTimeTick
{
findMapMutex
.
Lock
()
...
...
@@ -593,60 +549,50 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer,
}
func
(
ms
*
PulsarTtMsgStream
)
Seek
(
mp
*
internalpb2
.
MsgPosition
)
error
{
var
consumer
Consumer
var
messageID
MessageID
for
index
,
channel
:=
range
ms
.
consumerChannels
{
if
filepath
.
Base
(
channel
)
==
filepath
.
Base
(
mp
.
ChannelName
)
{
seekMsgID
,
err
:=
typeutil
.
StringToPulsarMsgID
(
mp
.
MsgID
)
messageID
,
err
:=
typeutil
.
StringToPulsarMsgID
(
mp
.
MsgID
)
if
err
!=
nil
{
return
err
}
consumer
:=
ms
.
consumers
[
index
]
err
=
(
consumer
)
.
Seek
(
messageID
)
if
err
!=
nil
{
return
err
}
consumer
=
ms
.
consumers
[
index
]
messageID
=
seekMsgID
break
}
}
if
consumer
!=
nil
{
err
:=
(
consumer
)
.
Seek
(
messageID
)
if
err
!=
nil
{
return
err
}
for
{
select
{
case
<-
ms
.
ctx
.
Done
()
:
return
nil
case
pulsarMsg
,
ok
:=
<-
consumer
.
Chan
()
:
if
!
ok
{
return
errors
.
New
(
"consumer closed"
)
}
consumer
.
Ack
(
pulsarMsg
)
ms
.
unsolvedMutex
.
Lock
()
ms
.
unsolvedBuf
[
consumer
]
=
make
([]
TsMsg
,
0
)
for
{
select
{
case
<-
ms
.
ctx
.
Done
()
:
return
nil
case
pulsarMsg
,
ok
:=
<-
consumer
.
Chan
()
:
if
!
ok
{
return
errors
.
New
(
"consumer closed"
)
}
consumer
.
Ack
(
pulsarMsg
)
headerMsg
:=
commonpb
.
MsgHeader
{}
err
:=
proto
.
Unmarshal
(
pulsarMsg
.
Payload
(),
&
headerMsg
)
if
err
!=
nil
{
log
.
Printf
(
"Failed to unmarshal msgHeader, error = %v"
,
err
)
}
tsMsg
,
err
:=
ms
.
unmarshal
.
Unmarshal
(
pulsarMsg
.
Payload
(),
headerMsg
.
Base
.
MsgType
)
if
tsMsg
==
nil
&&
err
!=
nil
{
panic
(
"null unMarshalFunc for "
+
headerMsg
.
Base
.
MsgType
.
String
()
+
" msg type"
)
headerMsg
:=
commonpb
.
MsgHeader
{}
err
:=
proto
.
Unmarshal
(
pulsarMsg
.
Payload
(),
&
headerMsg
)
if
err
!=
nil
{
log
.
Printf
(
"Failed to unmarshal message header, error = %v"
,
err
)
}
tsMsg
,
err
:=
ms
.
unmarshal
.
Unmarshal
(
pulsarMsg
.
Payload
(),
headerMsg
.
Base
.
MsgType
)
if
err
!=
nil
{
log
.
Printf
(
"Failed to unmarshal tsMsg, error = %v"
,
err
)
}
if
tsMsg
.
Type
()
==
commonpb
.
MsgType_kTimeTick
{
if
tsMsg
.
BeginTs
()
>=
mp
.
Timestamp
{
ms
.
unsolvedMutex
.
Unlock
()
return
nil
}
continue
}
if
tsMsg
.
BeginTs
()
>
mp
.
Timestamp
{
tsMsg
.
SetPosition
(
&
msgstream
.
MsgPosition
{
ChannelName
:
filepath
.
Base
(
pulsarMsg
.
Topic
()),
MsgID
:
typeutil
.
PulsarMsgIDToString
(
pulsarMsg
.
ID
()),
})
ms
.
unsolvedBuf
[
consumer
]
=
append
(
ms
.
unsolvedBuf
[
consumer
],
tsMsg
)
if
err
!=
nil
{
log
.
Printf
(
"Failed to unmarshal pulsarMsg, error = %v"
,
err
)
}
if
tsMsg
.
Type
()
==
commonpb
.
MsgType_kTimeTick
{
if
tsMsg
.
BeginTs
()
>=
mp
.
Timestamp
{
return
nil
}
continue
}
if
tsMsg
.
BeginTs
()
>
mp
.
Timestamp
{
ms
.
unsolvedBuf
[
consumer
]
=
append
(
ms
.
unsolvedBuf
[
consumer
],
tsMsg
)
}
}
}
}
...
...
internal/msgstream/pulsarms/pulsar_msgstream_test.go
浏览文件 @
c231f6da
...
...
@@ -568,7 +568,7 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) {
msgPack3
.
Msgs
=
append
(
msgPack3
.
Msgs
,
getTsMsg
(
commonpb
.
MsgType_kInsert
,
9
,
9
))
msgPack4
:=
MsgPack
{}
msgPack4
.
Msgs
=
append
(
msgPack
4
.
Msgs
,
getTimeTickMsg
(
11
,
11
,
11
))
msgPack4
.
Msgs
=
append
(
msgPack
2
.
Msgs
,
getTimeTickMsg
(
11
,
11
,
11
))
msgPack5
:=
MsgPack
{}
msgPack5
.
Msgs
=
append
(
msgPack5
.
Msgs
,
getTimeTickMsg
(
15
,
15
,
15
))
...
...
tests/python/requirements.txt
浏览文件 @
c231f6da
...
...
@@ -2,7 +2,7 @@ grpcio==1.26.0
grpcio-tools==1.26.0
numpy==1.18.1
pytest-cov==2.8.1
pymilvus-distributed==0.0.
20
pymilvus-distributed==0.0.
19
sklearn==0.0
pytest==4.5.0
pytest-timeout==1.3.3
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录