未验证 提交 764cf1a5 编写于 作者: Y yukun 提交者: GitHub

Add rocksmq server close when standalone exit (#12255)

Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>
上级 26405d13
...@@ -28,6 +28,7 @@ import ( ...@@ -28,6 +28,7 @@ import (
"syscall" "syscall"
"github.com/milvus-io/milvus/internal/util/healthz" "github.com/milvus-io/milvus/internal/util/healthz"
"github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
"github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/metricsinfo"
"go.uber.org/zap" "go.uber.org/zap"
...@@ -56,6 +57,15 @@ func newMsgFactory(localMsg bool) msgstream.Factory { ...@@ -56,6 +57,15 @@ func newMsgFactory(localMsg bool) msgstream.Factory {
return msgstream.NewPmsFactory() return msgstream.NewPmsFactory()
} }
func initRocksmq() error {
err := rocksmq.InitRocksMQ()
return err
}
func stopRocksmq() {
rocksmq.CloseRocksMQ()
}
// MilvusRoles determines to run which components. // MilvusRoles determines to run which components.
type MilvusRoles struct { type MilvusRoles struct {
EnableRootCoord bool `env:"ENABLE_ROOT_COORD"` EnableRootCoord bool `env:"ENABLE_ROOT_COORD"`
...@@ -349,6 +359,12 @@ func (mr *MilvusRoles) Run(localMsg bool, alias string) { ...@@ -349,6 +359,12 @@ func (mr *MilvusRoles) Run(localMsg bool, alias string) {
cfg := mr.setLogConfigFilename("standalone.log") cfg := mr.setLogConfigFilename("standalone.log")
logutil.SetupLogger(cfg) logutil.SetupLogger(cfg)
defer log.Sync() defer log.Sync()
err := initRocksmq()
if err != nil {
panic(err)
}
defer stopRocksmq()
} else { } else {
err := os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.ClusterDeployMode) err := os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.ClusterDeployMode)
if err != nil { if err != nil {
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
package rocksmq package rocksmq
import ( import (
"errors"
"os" "os"
"strconv" "strconv"
"sync" "sync"
...@@ -43,26 +44,33 @@ func InitRmq(rocksdbName string, idAllocator allocator.GIDAllocator) error { ...@@ -43,26 +44,33 @@ func InitRmq(rocksdbName string, idAllocator allocator.GIDAllocator) error {
// InitRocksMQ init global rocksmq single instance // InitRocksMQ init global rocksmq single instance
func InitRocksMQ() error { func InitRocksMQ() error {
var err error var finalErr error
once.Do(func() { once.Do(func() {
params.Init() params.Init()
rocksdbName, _ := params.Load("_RocksmqPath") rocksdbName, _ := params.Load("_RocksmqPath")
log.Debug("RocksmqPath=" + rocksdbName) log.Debug("RocksmqPath=" + rocksdbName)
_, err = os.Stat(rocksdbName) var fi os.FileInfo
if os.IsNotExist(err) { fi, finalErr = os.Stat(rocksdbName)
err = os.MkdirAll(rocksdbName, os.ModePerm) if os.IsNotExist(finalErr) {
if err != nil { finalErr = os.MkdirAll(rocksdbName, os.ModePerm)
errMsg := "Create dir " + rocksdbName + " failed" if finalErr != nil {
panic(errMsg) return
}
} else {
if !fi.IsDir() {
errMsg := "can't create a directory because there exists a file with the same name"
finalErr = errors.New(errMsg)
return
} }
} }
kvname := rocksdbName + "_kv" kvname := rocksdbName + "_kv"
rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvname) var rkv *rocksdbkv.RocksdbKV
if err != nil { rkv, finalErr = rocksdbkv.NewRocksdbKV(kvname)
panic(err) if finalErr != nil {
return
} }
idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV) idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rkv)
_ = idAllocator.Initialize() _ = idAllocator.Initialize()
rawRmqPageSize, err := params.Load("rocksmq.rocksmqPageSize") rawRmqPageSize, err := params.Load("rocksmq.rocksmqPageSize")
...@@ -94,12 +102,9 @@ func InitRocksMQ() error { ...@@ -94,12 +102,9 @@ func InitRocksMQ() error {
} }
log.Debug("", zap.Any("RocksmqRetentionTimeInMinutes", RocksmqRetentionTimeInMinutes), log.Debug("", zap.Any("RocksmqRetentionTimeInMinutes", RocksmqRetentionTimeInMinutes),
zap.Any("RocksmqRetentionSizeInMB", RocksmqRetentionSizeInMB), zap.Any("RocksmqPageSize", RocksmqPageSize)) zap.Any("RocksmqRetentionSizeInMB", RocksmqRetentionSizeInMB), zap.Any("RocksmqPageSize", RocksmqPageSize))
Rmq, err = NewRocksMQ(rocksdbName, idAllocator) Rmq, finalErr = NewRocksMQ(rocksdbName, idAllocator)
if err != nil {
panic(err)
}
}) })
return err return finalErr
} }
// CloseRocksMQ is used to close global rocksmq // CloseRocksMQ is used to close global rocksmq
......
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"log" "log"
"os" "os"
"strings" "strings"
"sync"
"testing" "testing"
"github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/allocator"
...@@ -69,3 +70,13 @@ func Test_InitRocksMQ(t *testing.T) { ...@@ -69,3 +70,13 @@ func Test_InitRocksMQ(t *testing.T) {
} }
Rmq.RegisterConsumer(consumer) Rmq.RegisterConsumer(consumer)
} }
func Test_InitRocksMQError(t *testing.T) {
once = sync.Once{}
dummyPath := "/tmp/milvus/dummy"
os.Create(dummyPath)
os.Setenv("ROCKSMQ_PATH", dummyPath)
defer os.RemoveAll(dummyPath)
err := InitRocksMQ()
assert.Error(t, err)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册