未验证 提交 f4e0736e 编写于 作者: G godchen 提交者: GitHub

Handle util rocksmq returned error (#9169)

Signed-off-by: Ngodchen <qingxiang.chen@zilliz.com>
上级 00630328
......@@ -155,7 +155,8 @@ func TestClient_consume(t *testing.T) {
msg := &ProducerMessage{
Payload: make([]byte, 10),
}
producer.Send(msg)
_, err = producer.Send(msg)
assert.Nil(t, err)
<-consumer.Chan()
......
......@@ -137,5 +137,6 @@ func TestConsumer_Seek(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, consumer)
consumer.Seek(0)
err = consumer.Seek(0)
assert.NotNil(t, err)
}
......@@ -18,8 +18,10 @@ import (
"github.com/milvus-io/milvus/internal/allocator"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/milvus-io/milvus/internal/log"
rocksmq "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
server "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
"go.uber.org/zap"
)
func newTopicName() string {
......@@ -64,9 +66,18 @@ func newRocksMQ(rmqPath string) server.RocksMQ {
func removePath(rmqPath string) {
kvPath := rmqPath + "_kv"
os.RemoveAll(kvPath)
err := os.RemoveAll(kvPath)
if err != nil {
log.Error("os removeAll failed.", zap.Any("path", kvPath))
}
rocksdbPath := rmqPath + "_db"
os.RemoveAll(rocksdbPath)
err = os.RemoveAll(rocksdbPath)
if err != nil {
log.Error("os removeAll failed.", zap.Any("path", kvPath))
}
metaPath := rmqPath + "_meta_kv"
os.RemoveAll(metaPath)
err = os.RemoveAll(metaPath)
if err != nil {
log.Error("os removeAll failed.", zap.Any("path", kvPath))
}
}
......@@ -46,9 +46,10 @@ func Test_InitRmq(t *testing.T) {
func Test_InitRocksMQ(t *testing.T) {
// Params.Init()
rmqPath := "/tmp/milvus/rdb_data_global"
os.Setenv("ROCKSMQ_PATH", rmqPath)
err := os.Setenv("ROCKSMQ_PATH", rmqPath)
assert.Nil(t, err)
defer os.RemoveAll(rmqPath)
err := InitRocksMQ()
err = InitRocksMQ()
defer Rmq.stopRetention()
assert.NoError(t, err)
defer CloseRocksMQ()
......
......@@ -125,7 +125,10 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf
// Because loadRetentionInfo may need some time, so do this asynchronously. Finally start retention goroutine.
func (ri *retentionInfo) startRetentionInfo() {
var wg sync.WaitGroup
ri.kv.ResetPrefixLength(FixedChannelNameLen)
err := ri.kv.ResetPrefixLength(FixedChannelNameLen)
if err != nil {
log.Warn("Start load retention info", zap.Error(err))
}
for _, topic := range ri.topics {
log.Debug("Start load retention info", zap.Any("topic", topic))
// Load all page infos
......@@ -304,7 +307,7 @@ func (ri *retentionInfo) retention() error {
// 4. Do delete by range of [start_msg_id, end_msg_id) in rocksdb
// 5. Delete corresponding data in retentionInfo
func (ri *retentionInfo) expiredCleanUp(topic string) error {
// log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic)
log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic)
var ackedInfo *topicAckedInfo
if info, ok := ri.ackedInfo.Load(topic); ok {
ackedInfo = info.(*topicAckedInfo)
......@@ -428,7 +431,11 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
} else if pageStartID < pageEndID {
pageWriteBatch.DeleteRange([]byte(pageStartKey), []byte(pageEndKey))
}
ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), pageWriteBatch)
err = ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), pageWriteBatch)
if err != nil {
log.Error("rocksdb write error", zap.Error(err))
return err
}
pageInfo.pageEndID = pageInfo.pageEndID[pageRetentionOffset:]
}
......@@ -456,7 +463,11 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
} else {
ackedTsWriteBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey))
}
ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), ackedTsWriteBatch)
err = ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), ackedTsWriteBatch)
if err != nil {
log.Error("rocksdb write error", zap.Error(err))
return err
}
// Update acked_size in rocksdb_kv
......
......@@ -29,7 +29,11 @@ import (
var retentionPath string = "/tmp/rmq_retention/"
func TestMain(m *testing.M) {
os.MkdirAll(retentionPath, os.ModePerm)
err := os.MkdirAll(retentionPath, os.ModePerm)
if err != nil {
log.Error("MkdirALl error for path", zap.Any("path", retentionPath))
return
}
code := m.Run()
os.Exit(code)
}
......@@ -196,7 +200,8 @@ func TestRetentionInfo_LoadRetentionInfo(t *testing.T) {
defer lock.Unlock()
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
initRetentionInfo(rmq.retentionInfo.kv, rmq.store)
_, err = initRetentionInfo(rmq.retentionInfo.kv, rmq.store)
assert.Nil(t, err)
dummyTopic := strings.Repeat(topicName, 100)
err = DeleteMessages(rmq.store, dummyTopic, 0, 0)
......@@ -208,90 +213,115 @@ func TestRetentionInfo_LoadRetentionInfo(t *testing.T) {
//////////////////////////////////////////////////
ackedTsPrefix, _ := constructKey(AckedTsTitle, topicName)
ackedTsKey0 := ackedTsPrefix + "/1"
rmq.retentionInfo.kv.Save(ackedTsKey0, "dummy")
err = rmq.retentionInfo.kv.Save(ackedTsKey0, "dummy")
assert.Nil(t, err)
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
rmq.retentionInfo.kv.Remove(ackedTsKey0)
err = rmq.retentionInfo.kv.Remove(ackedTsKey0)
assert.Nil(t, err)
//////////////////////////////////////////////////
ackedTsKey1 := ackedTsPrefix + "/dummy"
rmq.retentionInfo.kv.Save(ackedTsKey1, "dummy")
err = rmq.retentionInfo.kv.Save(ackedTsKey1, "dummy")
assert.Nil(t, err)
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
rmq.retentionInfo.kv.Remove(ackedTsKey1)
err = rmq.retentionInfo.kv.Remove(ackedTsKey1)
assert.Nil(t, err)
//////////////////////////////////////////////////
lastRetentionTsKey := LastRetTsTitle + topicName
rmq.retentionInfo.kv.Save(lastRetentionTsKey, strconv.FormatInt(1, 10))
err = rmq.retentionInfo.kv.Save(lastRetentionTsKey, strconv.FormatInt(1, 10))
assert.Nil(t, err)
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
rmq.retentionInfo.kv.Remove(lastRetentionTsKey)
err = rmq.retentionInfo.kv.Remove(lastRetentionTsKey)
assert.Nil(t, err)
//////////////////////////////////////////////////
rmq.retentionInfo.kv.Save(lastRetentionTsKey, "dummy")
err = rmq.retentionInfo.kv.Save(lastRetentionTsKey, "dummy")
assert.Nil(t, err)
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
rmq.retentionInfo.kv.Remove(lastRetentionTsKey)
err = rmq.retentionInfo.kv.Remove(lastRetentionTsKey)
assert.Nil(t, err)
//////////////////////////////////////////////////
ackedSizeKey := AckedSizeTitle + topicName
rmq.retentionInfo.kv.Save(ackedSizeKey, strconv.FormatInt(1, 10))
err = rmq.retentionInfo.kv.Save(ackedSizeKey, strconv.FormatInt(1, 10))
assert.Nil(t, err)
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
rmq.retentionInfo.kv.Remove(ackedSizeKey)
err = rmq.retentionInfo.kv.Remove(ackedSizeKey)
assert.Nil(t, err)
//////////////////////////////////////////////////
rmq.retentionInfo.kv.Save(ackedSizeKey, "")
err = rmq.retentionInfo.kv.Save(ackedSizeKey, "")
assert.Nil(t, err)
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
rmq.retentionInfo.kv.Remove(ackedSizeKey)
err = rmq.retentionInfo.kv.Remove(ackedSizeKey)
assert.Nil(t, err)
//////////////////////////////////////////////////
rmq.retentionInfo.kv.Save(ackedSizeKey, "dummy")
err = rmq.retentionInfo.kv.Save(ackedSizeKey, "dummy")
assert.Nil(t, err)
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
rmq.retentionInfo.kv.Remove(ackedSizeKey)
err = rmq.retentionInfo.kv.Remove(ackedSizeKey)
assert.Nil(t, err)
//////////////////////////////////////////////////
topicBeginIDKey := TopicBeginIDTitle + topicName
rmq.retentionInfo.kv.Save(topicBeginIDKey, "dummy")
err = rmq.retentionInfo.kv.Save(topicBeginIDKey, "dummy")
assert.Nil(t, err)
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
rmq.retentionInfo.kv.Remove(topicBeginIDKey)
err = rmq.retentionInfo.kv.Remove(topicBeginIDKey)
assert.Nil(t, err)
////////////////////////////////////////////////////
fixedPageSizeKey0, _ := constructKey(PageMsgSizeTitle, topicName)
pageMsgSizeKey0 := fixedPageSizeKey0 + "/" + "1"
rmq.retentionInfo.kv.Save(pageMsgSizeKey0, "dummy")
err = rmq.retentionInfo.kv.Save(pageMsgSizeKey0, "dummy")
assert.Nil(t, err)
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
rmq.retentionInfo.kv.Remove(pageMsgSizeKey0)
err = rmq.retentionInfo.kv.Remove(pageMsgSizeKey0)
assert.Nil(t, err)
//////////////////////////////////////////////////
fixedPageSizeKey1, _ := constructKey(PageMsgSizeTitle, topicName)
pageMsgSizeKey1 := fixedPageSizeKey1 + "/" + "dummy"
rmq.retentionInfo.kv.Save(pageMsgSizeKey1, "dummy")
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
rmq.retentionInfo.kv.Remove(pageMsgSizeKey1)
//////////////////////////////////////////////////
rmq.retentionInfo.kv.DB = nil
err = rmq.retentionInfo.kv.Save(pageMsgSizeKey1, "dummy")
assert.Nil(t, err)
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
err = rmq.retentionInfo.kv.Remove(pageMsgSizeKey1)
assert.Nil(t, err)
//////////////////////////////////////////////////
topicMu.Delete(topicName)
topicMu.Store(topicName, topicName)
rmq.retentionInfo.expiredCleanUp(topicName)
topicMu.Store(topicName, &sync.Mutex{})
err = rmq.retentionInfo.expiredCleanUp(topicName)
assert.Nil(t, err)
//////////////////////////////////////////////////
topicMu.Delete(topicName)
rmq.retentionInfo.expiredCleanUp(topicName)
err = rmq.retentionInfo.expiredCleanUp(topicName)
// TopicName has been deleted in line 310
assert.NotNil(t, err)
//////////////////////////////////////////////////
rmq.retentionInfo.ackedInfo.Delete(topicName)
rmq.retentionInfo.expiredCleanUp(topicName)
err = rmq.retentionInfo.expiredCleanUp(topicName)
assert.Nil(t, err)
//////////////////////////////////////////////////
rmq.retentionInfo.kv.DB = nil
wg.Add(1)
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
}
func TestRmqRetention_Complex(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册