未验证 提交 e0761271 编写于 作者: Y yukun 提交者: GitHub

[skip ci]Add rocksmq_retention comment (#8016)

Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>
上级 0aa65f55
......@@ -107,6 +107,8 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf
return ri, nil
}
// Before do retention, load retention info from rocksdb to retention info structure in goroutines.
// Because loadRetentionInfo may need some time, so do this asynchronously. Finally start retention goroutine.
func (ri *retentionInfo) startRetentionInfo() error {
var wg sync.WaitGroup
ri.kv.ResetPrefixLength(FixedChannelNameLen)
......@@ -123,6 +125,7 @@ func (ri *retentionInfo) startRetentionInfo() error {
return nil
}
// Read retention infos from rocksdb so that retention check can be done based on memory data
func (ri *retentionInfo) loadRetentionInfo(topic string, wg *sync.WaitGroup) {
// TODO(yukun): If there needs to add lock
// ll, ok := topicMu.Load(topic)
......@@ -248,6 +251,7 @@ func (ri *retentionInfo) loadRetentionInfo(topic string, wg *sync.WaitGroup) {
func (ri *retentionInfo) retention() error {
log.Debug("Rocksmq retention goroutine start!")
// Do retention check every 6s
ticker := time.NewTicker(time.Duration(TickerTimeInMinutes * int64(time.Minute) / 10))
for {
......@@ -271,6 +275,11 @@ func (ri *retentionInfo) retention() error {
}
}
// 1. Obtain pageAckedInfo and do time expired check, get the expired page scope;
// 2. Do iteration in the page after the last page in step 1 and get the last time expired message id;
// 3. Do size expired check in next page, and get the last size expired message id;
// 4. Do delete by range of [start_msg_id, end_msg_id) in rocksdb
// 5. Delete corresponding data in retentionInfo
func (ri *retentionInfo) expiredCleanUp(topic string) error {
// log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic)
var ackedInfo *topicAckedInfo
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册