From 4c4b2903f9685a5074a7d5d01029683b93e7ec7e Mon Sep 17 00:00:00 2001 From: chyezh Date: Mon, 14 Aug 2023 14:21:32 +0800 Subject: [PATCH] [Fixup] nats log configuration, open nats log at info level by deafult (#26168) Signed-off-by: chyezh --- configs/milvus.yaml | 6 +- pkg/mq/msgstream/mqwrapper/nmq/nmq_server.go | 41 +++++++++--- .../mqwrapper/nmq/nmq_server_test.go | 64 ++++++++++++++++++- pkg/util/paramtable/service_param.go | 25 +++++--- 4 files changed, 114 insertions(+), 22 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 265fdadd1..11daf95af 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -137,6 +137,7 @@ rocksmq: compressionTypes: [0, 0, 7, 7, 7] # natsmq configuration. +# more detail: https://docs.nats.io/running-a-nats-service/configuration natsmq: server: # server side configuration for natsmq. port: 4222 # 4222 by default, Port for nats server listening. @@ -146,10 +147,11 @@ natsmq: maxPending: 67108864 # (B) 64MB by default, Maximum number of bytes buffered for a connection Applies to client connections. initializeTimeout: 4000 # (ms) 4s by default, waiting for initialization of natsmq finished. monitor: + trace: false # false by default, If true enable protocol trace log messages. debug: false # false by default, If true enable debug log messages. logTime: true # true by default, If set to false, log without timestamps. - logFile: # no log file by default, Log file path relative to.. . - logSizeLimit: 0 # (B) 0, unlimited by default, Size in bytes after the log file rolls over to a new one. + logFile: /tmp/milvus/logs/nats.log # /tmp/milvus/logs/nats.log by default, Log file path relative to .. of milvus binary if use relative path. + logSizeLimit: 536870912 # (B) 512MB by default, Size in bytes after the log file rolls over to a new one. retention: maxAge: 4320 # (min) 3 days by default, Maximum age of any message in the P-channel. maxBytes: # (B) None by default, How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size. diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_server.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_server.go index e3d36af60..bbd963139 100644 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_server.go +++ b/pkg/mq/msgstream/mqwrapper/nmq/nmq_server.go @@ -17,9 +17,12 @@ package nmq import ( + "os" + "path" "sync" "time" + "github.com/cockroachdb/errors" "github.com/nats-io/nats-server/v2/server" "go.uber.org/zap" @@ -43,21 +46,38 @@ type NatsMQConfig struct { // Panic if initailizing operation failed. func MustInitNatsMQ(cfg *NatsMQConfig) { once.Do(func() { - log.Info("try to initialize global nmq", zap.Any("config", cfg)) var err error - Nmq, err = server.NewServer(&cfg.Opts) + Nmq, err = initNatsMQ(cfg) if err != nil { - log.Fatal("fail to initailize nmq", zap.Error(err)) + log.Fatal("initialize nmq failed", zap.Error(err)) } + }) +} - // Start Nmq in background and wait until it's ready for connection. - go Nmq.Start() - // Wait for server to be ready for connections - if !Nmq.ReadyForConnections(cfg.InitializeTimeout) { - log.Fatal("nmq is not ready within timeout") +func initNatsMQ(cfg *NatsMQConfig) (*server.Server, error) { + log.Info("try to initialize global nmq", zap.Any("config", cfg)) + natsServer, err := server.NewServer(&cfg.Opts) + if err != nil { + return nil, errors.Wrap(err, "fail to initailize nmq") + } + // Config log if log file set. + if cfg.Opts.LogFile != "" { + if err := os.MkdirAll(path.Dir(cfg.Opts.LogFile), 0o744); err != nil { + return nil, errors.Wrap(err, "fail to create directory for nats log file") } - log.Info("initialize nmq finished", zap.String("client-url", Nmq.ClientURL()), zap.Error(err)) - }) + // make directory for the file + natsServer.ConfigureLogger() + } + // Start Nmq in background and wait until it's ready for connection. + if err := server.Run(natsServer); err != nil { + return nil, errors.Wrap(err, "start nmq failed") + } + // Wait for server to be ready for connections + if !natsServer.ReadyForConnections(cfg.InitializeTimeout) { + return nil, errors.New("nmq is not ready within timeout") + } + log.Info("initialize nmq finished", zap.String("client-url", natsServer.ClientURL()), zap.Error(err)) + return natsServer, nil } // ParseServerOption get nats server option from paramstable. @@ -71,6 +91,7 @@ func ParseServerOption(params *paramtable.ComponentParam) *NatsMQConfig { JetStream: true, JetStreamMaxStore: params.NatsmqCfg.ServerMaxFileStore.GetAsInt64(), StoreDir: params.NatsmqCfg.ServerStoreDir.GetValue(), + Trace: params.NatsmqCfg.ServerMonitorTrace.GetAsBool(), Debug: params.NatsmqCfg.ServerMonitorDebug.GetAsBool(), Logtime: params.NatsmqCfg.ServerMonitorLogTime.GetAsBool(), LogFile: params.NatsmqCfg.ServerMonitorLogFile.GetValue(), diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_server_test.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_server_test.go index 205d5b926..53a5b1509 100644 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_server_test.go +++ b/pkg/mq/msgstream/mqwrapper/nmq/nmq_server_test.go @@ -17,11 +17,13 @@ package nmq import ( + "fmt" "os" "testing" "time" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/nats-io/nats-server/v2/server" "github.com/stretchr/testify/assert" ) @@ -34,6 +36,7 @@ func TestMain(m *testing.M) { defer os.RemoveAll(storeDir) cfg := ParseServerOption(paramtable.Get()) + cfg.Opts.Port = server.RANDOM_PORT cfg.Opts.StoreDir = storeDir MustInitNatsMQ(cfg) defer CloseNatsMQ() @@ -43,6 +46,62 @@ func TestMain(m *testing.M) { os.Exit(exitCode) } +func TestInitNatsMQ(t *testing.T) { + func() { + cfg := ParseServerOption(paramtable.Get()) + storeDir, _ := os.MkdirTemp("", "milvus_mq_nmq") + defer os.RemoveAll(storeDir) + cfg.Opts.StoreDir = storeDir + cfg.Opts.Port = server.RANDOM_PORT + cfg.Opts.LogFile = "" + mq, err := initNatsMQ(cfg) + assert.NoError(t, err) + assert.NotNil(t, mq) + mq.Shutdown() + mq.WaitForShutdown() + }() + + func() { + cfg := ParseServerOption(paramtable.Get()) + storeDir, _ := os.MkdirTemp("", "milvus_mq_nmq") + defer os.RemoveAll(storeDir) + cfg.Opts.StoreDir = storeDir + cfg.Opts.Port = server.RANDOM_PORT + cfg.Opts.LogFile = "" + mq, err := initNatsMQ(cfg) + assert.NoError(t, err) + assert.NotNil(t, mq) + mq.Shutdown() + mq.WaitForShutdown() + }() + + func() { + cfg := ParseServerOption(paramtable.Get()) + storeDir, _ := os.MkdirTemp("", "milvus_mq_nmq") + defer os.RemoveAll(storeDir) + cfg.Opts.StoreDir = storeDir + cfg.Opts.Port = server.RANDOM_PORT + cfg.Opts.MaxPending = -1 + mq, err := initNatsMQ(cfg) + assert.Error(t, err) + assert.Nil(t, mq) + }() + + func() { + ex, err := os.Executable() + assert.NoError(t, err) + cfg := ParseServerOption(paramtable.Get()) + storeDir, _ := os.MkdirTemp("", "milvus_mq_nmq") + defer os.RemoveAll(storeDir) + cfg.Opts.StoreDir = storeDir + cfg.Opts.Port = server.RANDOM_PORT + cfg.Opts.LogFile = fmt.Sprintf("%s/test", ex) + mq, err := initNatsMQ(cfg) + assert.Error(t, err) + assert.Nil(t, mq) + }() +} + func TestGetServerOptionDefault(t *testing.T) { cfg := ParseServerOption(paramtable.Get()) assert.Equal(t, "127.0.0.1", cfg.Opts.Host) @@ -54,7 +113,8 @@ func TestGetServerOptionDefault(t *testing.T) { assert.Equal(t, int64(67108864), cfg.Opts.MaxPending) assert.Equal(t, 4000*time.Millisecond, cfg.InitializeTimeout) assert.Equal(t, false, cfg.Opts.Debug) + assert.Equal(t, false, cfg.Opts.Trace) assert.Equal(t, true, cfg.Opts.Logtime) - assert.Equal(t, "", cfg.Opts.LogFile) - assert.Equal(t, int64(0), cfg.Opts.LogSizeLimit) + assert.Equal(t, "/tmp/milvus/logs/nats.log", cfg.Opts.LogFile) + assert.Equal(t, int64(536870912), cfg.Opts.LogSizeLimit) } diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 8508bd8b6..cfa2a1800 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -723,6 +723,7 @@ type NatsmqConfig struct { ServerMaxPayload ParamItem `refreshable:"false"` ServerMaxPending ParamItem `refreshable:"false"` ServerInitializeTimeout ParamItem `refreshable:"false"` + ServerMonitorTrace ParamItem `refreshable:"false"` ServerMonitorDebug ParamItem `refreshable:"false"` ServerMonitorLogTime ParamItem `refreshable:"false"` ServerMonitorLogFile ParamItem `refreshable:"false"` @@ -743,10 +744,11 @@ func (r *NatsmqConfig) Init(base *BaseTable) { } r.ServerPort.Init(base.mgr) r.ServerStoreDir = ParamItem{ - Key: "natsmq.server.storeDir", - Version: "2.3.0", - Doc: `Directory to use for JetStream storage of nats`, - Export: true, + Key: "natsmq.server.storeDir", + DefaultValue: "/var/lib/milvus/nats", + Version: "2.3.0", + Doc: `Directory to use for JetStream storage of nats`, + Export: true, } r.ServerStoreDir.Init(base.mgr) r.ServerMaxFileStore = ParamItem{ @@ -781,7 +783,14 @@ func (r *NatsmqConfig) Init(base *BaseTable) { Export: true, } r.ServerInitializeTimeout.Init(base.mgr) - + r.ServerMonitorTrace = ParamItem{ + Key: "natsmq.server.monitor.trace", + Version: "2.3.0", + DefaultValue: "false", + Doc: `If true enable protocol trace log messages`, + Export: true, + } + r.ServerMonitorTrace.Init(base.mgr) r.ServerMonitorDebug = ParamItem{ Key: "natsmq.server.monitor.debug", Version: "2.3.0", @@ -801,15 +810,15 @@ func (r *NatsmqConfig) Init(base *BaseTable) { r.ServerMonitorLogFile = ParamItem{ Key: "natsmq.server.monitor.logFile", Version: "2.3.0", - DefaultValue: "", - Doc: `Log file path relative to..`, + DefaultValue: "/tmp/milvus/logs/nats.log", + Doc: `Log file path relative to .. of milvus binary if use relative path`, Export: true, } r.ServerMonitorLogFile.Init(base.mgr) r.ServerMonitorLogSizeLimit = ParamItem{ Key: "natsmq.server.monitor.logSizeLimit", Version: "2.3.0", - DefaultValue: "0", + DefaultValue: "536870912", Doc: `Size in bytes after the log file rolls over to a new one`, Export: true, } -- GitLab