Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
4c4b2903
M
milvus
项目概览
milvus
/
milvus
9 个月 前同步成功
通知
260
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
4c4b2903
编写于
8月 14, 2023
作者:
C
chyezh
提交者:
GitHub
8月 14, 2023
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[Fixup] nats log configuration, open nats log at info level by deafult (#26168)
Signed-off-by:
N
chyezh
<
ye.zhen@zilliz.com
>
上级
9cb5943e
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
114 addition
and
22 deletion
+114
-22
configs/milvus.yaml
configs/milvus.yaml
+4
-2
pkg/mq/msgstream/mqwrapper/nmq/nmq_server.go
pkg/mq/msgstream/mqwrapper/nmq/nmq_server.go
+31
-10
pkg/mq/msgstream/mqwrapper/nmq/nmq_server_test.go
pkg/mq/msgstream/mqwrapper/nmq/nmq_server_test.go
+62
-2
pkg/util/paramtable/service_param.go
pkg/util/paramtable/service_param.go
+17
-8
未找到文件。
configs/milvus.yaml
浏览文件 @
4c4b2903
...
...
@@ -137,6 +137,7 @@ rocksmq:
compressionTypes
:
[
0
,
0
,
7
,
7
,
7
]
# natsmq configuration.
# more detail: https://docs.nats.io/running-a-nats-service/configuration
natsmq
:
server
:
# server side configuration for natsmq.
port
:
4222
# 4222 by default, Port for nats server listening.
...
...
@@ -146,10 +147,11 @@ natsmq:
maxPending
:
67108864
# (B) 64MB by default, Maximum number of bytes buffered for a connection Applies to client connections.
initializeTimeout
:
4000
# (ms) 4s by default, waiting for initialization of natsmq finished.
monitor
:
trace
:
false
# false by default, If true enable protocol trace log messages.
debug
:
false
# false by default, If true enable debug log messages.
logTime
:
true
# true by default, If set to false, log without timestamps.
logFile
:
# no log file by default, Log file path relative to..
.
logSizeLimit
:
0
# (B) 0, unlimited
by default, Size in bytes after the log file rolls over to a new one.
logFile
:
/tmp/milvus/logs/nats.log
# /tmp/milvus/logs/nats.log by default, Log file path relative to .. of milvus binary if use relative path
.
logSizeLimit
:
536870912
# (B) 512MB
by default, Size in bytes after the log file rolls over to a new one.
retention
:
maxAge
:
4320
# (min) 3 days by default, Maximum age of any message in the P-channel.
maxBytes
:
# (B) None by default, How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size.
...
...
pkg/mq/msgstream/mqwrapper/nmq/nmq_server.go
浏览文件 @
4c4b2903
...
...
@@ -17,9 +17,12 @@
package
nmq
import
(
"os"
"path"
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/nats-io/nats-server/v2/server"
"go.uber.org/zap"
...
...
@@ -43,21 +46,38 @@ type NatsMQConfig struct {
// Panic if initailizing operation failed.
func
MustInitNatsMQ
(
cfg
*
NatsMQConfig
)
{
once
.
Do
(
func
()
{
log
.
Info
(
"try to initialize global nmq"
,
zap
.
Any
(
"config"
,
cfg
))
var
err
error
Nmq
,
err
=
server
.
NewServer
(
&
cfg
.
Opts
)
Nmq
,
err
=
initNatsMQ
(
cfg
)
if
err
!=
nil
{
log
.
Fatal
(
"
fail to initailize nmq
"
,
zap
.
Error
(
err
))
log
.
Fatal
(
"
initialize nmq failed
"
,
zap
.
Error
(
err
))
}
})
}
// Start Nmq in background and wait until it's ready for connection.
go
Nmq
.
Start
()
// Wait for server to be ready for connections
if
!
Nmq
.
ReadyForConnections
(
cfg
.
InitializeTimeout
)
{
log
.
Fatal
(
"nmq is not ready within timeout"
)
func
initNatsMQ
(
cfg
*
NatsMQConfig
)
(
*
server
.
Server
,
error
)
{
log
.
Info
(
"try to initialize global nmq"
,
zap
.
Any
(
"config"
,
cfg
))
natsServer
,
err
:=
server
.
NewServer
(
&
cfg
.
Opts
)
if
err
!=
nil
{
return
nil
,
errors
.
Wrap
(
err
,
"fail to initailize nmq"
)
}
// Config log if log file set.
if
cfg
.
Opts
.
LogFile
!=
""
{
if
err
:=
os
.
MkdirAll
(
path
.
Dir
(
cfg
.
Opts
.
LogFile
),
0
o744
);
err
!=
nil
{
return
nil
,
errors
.
Wrap
(
err
,
"fail to create directory for nats log file"
)
}
log
.
Info
(
"initialize nmq finished"
,
zap
.
String
(
"client-url"
,
Nmq
.
ClientURL
()),
zap
.
Error
(
err
))
})
// make directory for the file
natsServer
.
ConfigureLogger
()
}
// Start Nmq in background and wait until it's ready for connection.
if
err
:=
server
.
Run
(
natsServer
);
err
!=
nil
{
return
nil
,
errors
.
Wrap
(
err
,
"start nmq failed"
)
}
// Wait for server to be ready for connections
if
!
natsServer
.
ReadyForConnections
(
cfg
.
InitializeTimeout
)
{
return
nil
,
errors
.
New
(
"nmq is not ready within timeout"
)
}
log
.
Info
(
"initialize nmq finished"
,
zap
.
String
(
"client-url"
,
natsServer
.
ClientURL
()),
zap
.
Error
(
err
))
return
natsServer
,
nil
}
// ParseServerOption get nats server option from paramstable.
...
...
@@ -71,6 +91,7 @@ func ParseServerOption(params *paramtable.ComponentParam) *NatsMQConfig {
JetStream
:
true
,
JetStreamMaxStore
:
params
.
NatsmqCfg
.
ServerMaxFileStore
.
GetAsInt64
(),
StoreDir
:
params
.
NatsmqCfg
.
ServerStoreDir
.
GetValue
(),
Trace
:
params
.
NatsmqCfg
.
ServerMonitorTrace
.
GetAsBool
(),
Debug
:
params
.
NatsmqCfg
.
ServerMonitorDebug
.
GetAsBool
(),
Logtime
:
params
.
NatsmqCfg
.
ServerMonitorLogTime
.
GetAsBool
(),
LogFile
:
params
.
NatsmqCfg
.
ServerMonitorLogFile
.
GetValue
(),
...
...
pkg/mq/msgstream/mqwrapper/nmq/nmq_server_test.go
浏览文件 @
4c4b2903
...
...
@@ -17,11 +17,13 @@
package
nmq
import
(
"fmt"
"os"
"testing"
"time"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/nats-io/nats-server/v2/server"
"github.com/stretchr/testify/assert"
)
...
...
@@ -34,6 +36,7 @@ func TestMain(m *testing.M) {
defer
os
.
RemoveAll
(
storeDir
)
cfg
:=
ParseServerOption
(
paramtable
.
Get
())
cfg
.
Opts
.
Port
=
server
.
RANDOM_PORT
cfg
.
Opts
.
StoreDir
=
storeDir
MustInitNatsMQ
(
cfg
)
defer
CloseNatsMQ
()
...
...
@@ -43,6 +46,62 @@ func TestMain(m *testing.M) {
os
.
Exit
(
exitCode
)
}
func
TestInitNatsMQ
(
t
*
testing
.
T
)
{
func
()
{
cfg
:=
ParseServerOption
(
paramtable
.
Get
())
storeDir
,
_
:=
os
.
MkdirTemp
(
""
,
"milvus_mq_nmq"
)
defer
os
.
RemoveAll
(
storeDir
)
cfg
.
Opts
.
StoreDir
=
storeDir
cfg
.
Opts
.
Port
=
server
.
RANDOM_PORT
cfg
.
Opts
.
LogFile
=
""
mq
,
err
:=
initNatsMQ
(
cfg
)
assert
.
NoError
(
t
,
err
)
assert
.
NotNil
(
t
,
mq
)
mq
.
Shutdown
()
mq
.
WaitForShutdown
()
}()
func
()
{
cfg
:=
ParseServerOption
(
paramtable
.
Get
())
storeDir
,
_
:=
os
.
MkdirTemp
(
""
,
"milvus_mq_nmq"
)
defer
os
.
RemoveAll
(
storeDir
)
cfg
.
Opts
.
StoreDir
=
storeDir
cfg
.
Opts
.
Port
=
server
.
RANDOM_PORT
cfg
.
Opts
.
LogFile
=
""
mq
,
err
:=
initNatsMQ
(
cfg
)
assert
.
NoError
(
t
,
err
)
assert
.
NotNil
(
t
,
mq
)
mq
.
Shutdown
()
mq
.
WaitForShutdown
()
}()
func
()
{
cfg
:=
ParseServerOption
(
paramtable
.
Get
())
storeDir
,
_
:=
os
.
MkdirTemp
(
""
,
"milvus_mq_nmq"
)
defer
os
.
RemoveAll
(
storeDir
)
cfg
.
Opts
.
StoreDir
=
storeDir
cfg
.
Opts
.
Port
=
server
.
RANDOM_PORT
cfg
.
Opts
.
MaxPending
=
-
1
mq
,
err
:=
initNatsMQ
(
cfg
)
assert
.
Error
(
t
,
err
)
assert
.
Nil
(
t
,
mq
)
}()
func
()
{
ex
,
err
:=
os
.
Executable
()
assert
.
NoError
(
t
,
err
)
cfg
:=
ParseServerOption
(
paramtable
.
Get
())
storeDir
,
_
:=
os
.
MkdirTemp
(
""
,
"milvus_mq_nmq"
)
defer
os
.
RemoveAll
(
storeDir
)
cfg
.
Opts
.
StoreDir
=
storeDir
cfg
.
Opts
.
Port
=
server
.
RANDOM_PORT
cfg
.
Opts
.
LogFile
=
fmt
.
Sprintf
(
"%s/test"
,
ex
)
mq
,
err
:=
initNatsMQ
(
cfg
)
assert
.
Error
(
t
,
err
)
assert
.
Nil
(
t
,
mq
)
}()
}
func
TestGetServerOptionDefault
(
t
*
testing
.
T
)
{
cfg
:=
ParseServerOption
(
paramtable
.
Get
())
assert
.
Equal
(
t
,
"127.0.0.1"
,
cfg
.
Opts
.
Host
)
...
...
@@ -54,7 +113,8 @@ func TestGetServerOptionDefault(t *testing.T) {
assert
.
Equal
(
t
,
int64
(
67108864
),
cfg
.
Opts
.
MaxPending
)
assert
.
Equal
(
t
,
4000
*
time
.
Millisecond
,
cfg
.
InitializeTimeout
)
assert
.
Equal
(
t
,
false
,
cfg
.
Opts
.
Debug
)
assert
.
Equal
(
t
,
false
,
cfg
.
Opts
.
Trace
)
assert
.
Equal
(
t
,
true
,
cfg
.
Opts
.
Logtime
)
assert
.
Equal
(
t
,
""
,
cfg
.
Opts
.
LogFile
)
assert
.
Equal
(
t
,
int64
(
0
),
cfg
.
Opts
.
LogSizeLimit
)
assert
.
Equal
(
t
,
"
/tmp/milvus/logs/nats.log
"
,
cfg
.
Opts
.
LogFile
)
assert
.
Equal
(
t
,
int64
(
536870912
),
cfg
.
Opts
.
LogSizeLimit
)
}
pkg/util/paramtable/service_param.go
浏览文件 @
4c4b2903
...
...
@@ -723,6 +723,7 @@ type NatsmqConfig struct {
ServerMaxPayload
ParamItem
`refreshable:"false"`
ServerMaxPending
ParamItem
`refreshable:"false"`
ServerInitializeTimeout
ParamItem
`refreshable:"false"`
ServerMonitorTrace
ParamItem
`refreshable:"false"`
ServerMonitorDebug
ParamItem
`refreshable:"false"`
ServerMonitorLogTime
ParamItem
`refreshable:"false"`
ServerMonitorLogFile
ParamItem
`refreshable:"false"`
...
...
@@ -743,10 +744,11 @@ func (r *NatsmqConfig) Init(base *BaseTable) {
}
r
.
ServerPort
.
Init
(
base
.
mgr
)
r
.
ServerStoreDir
=
ParamItem
{
Key
:
"natsmq.server.storeDir"
,
Version
:
"2.3.0"
,
Doc
:
`Directory to use for JetStream storage of nats`
,
Export
:
true
,
Key
:
"natsmq.server.storeDir"
,
DefaultValue
:
"/var/lib/milvus/nats"
,
Version
:
"2.3.0"
,
Doc
:
`Directory to use for JetStream storage of nats`
,
Export
:
true
,
}
r
.
ServerStoreDir
.
Init
(
base
.
mgr
)
r
.
ServerMaxFileStore
=
ParamItem
{
...
...
@@ -781,7 +783,14 @@ func (r *NatsmqConfig) Init(base *BaseTable) {
Export
:
true
,
}
r
.
ServerInitializeTimeout
.
Init
(
base
.
mgr
)
r
.
ServerMonitorTrace
=
ParamItem
{
Key
:
"natsmq.server.monitor.trace"
,
Version
:
"2.3.0"
,
DefaultValue
:
"false"
,
Doc
:
`If true enable protocol trace log messages`
,
Export
:
true
,
}
r
.
ServerMonitorTrace
.
Init
(
base
.
mgr
)
r
.
ServerMonitorDebug
=
ParamItem
{
Key
:
"natsmq.server.monitor.debug"
,
Version
:
"2.3.0"
,
...
...
@@ -801,15 +810,15 @@ func (r *NatsmqConfig) Init(base *BaseTable) {
r
.
ServerMonitorLogFile
=
ParamItem
{
Key
:
"natsmq.server.monitor.logFile"
,
Version
:
"2.3.0"
,
DefaultValue
:
""
,
Doc
:
`Log file path relative to
..
`
,
DefaultValue
:
"
/tmp/milvus/logs/nats.log
"
,
Doc
:
`Log file path relative to
.. of milvus binary if use relative path
`
,
Export
:
true
,
}
r
.
ServerMonitorLogFile
.
Init
(
base
.
mgr
)
r
.
ServerMonitorLogSizeLimit
=
ParamItem
{
Key
:
"natsmq.server.monitor.logSizeLimit"
,
Version
:
"2.3.0"
,
DefaultValue
:
"
0
"
,
DefaultValue
:
"
536870912
"
,
Doc
:
`Size in bytes after the log file rolls over to a new one`
,
Export
:
true
,
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录