diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go index 1a3b97c6a9902c8046507604c28a112e20c588a4..f065ef4b0bf9555440ab99435826b689be558048 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go @@ -260,8 +260,8 @@ func (ri *retentionInfo) retention() error { return nil case t := <-ticker.C: timeNow := t.Unix() - checkTime := RocksmqRetentionTimeInMinutes * 60 / 10 - log.Debug("In ticker: ", zap.Any("ticker", timeNow)) + checkTime := RocksmqRetentionTimeInMinutes * MINUTE / 10 + log.Debug("A retention triggered by time ticker: ", zap.Any("ticker", timeNow)) ri.lastRetentionTime.Range(func(k, v interface{}) bool { if v.(int64)+checkTime < timeNow { err := ri.expiredCleanUp(k.(string)) @@ -271,14 +271,6 @@ func (ri *retentionInfo) retention() error { } return true }) - // for k, v := range ri.lastRetentionTime { - // if v+checkTime < timeNow { - // err := ri.expiredCleanUp(k) - // if err != nil { - // panic(err) - // } - // } - // } } } } @@ -350,7 +342,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } } } - log.Debug("In expiredCleanUp: ", zap.Any("topic", topic), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize)) + log.Debug("Expired check by page info", zap.Any("topic", topic), zap.Any("pageEndID", endID), zap.Any("deletedAckedSize", deletedAckedSize)) pageEndID := endID // The end msg of the page is not expired, find the last expired msg in this page @@ -369,9 +361,10 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } } if endID == 0 { - log.Debug("All messages are not expired") + log.Debug("All messages are not expired", zap.Any("topic", topic)) return nil } + log.Debug("Expired check by retention time", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize)) // Delete page message size in rocksdb_kv if pageInfo != nil { @@ -388,6 +381,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { break } } + log.Debug("Expired check by retention size", zap.Any("topic", topic), zap.Any("new endID", endID), zap.Any("new deletedAckedSize", deletedAckedSize)) } if pageEndID > 0 && len(pageInfo.pageEndID) > 0 { @@ -403,7 +397,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { log.Debug("Delete page info", zap.Any("topic", topic), zap.Any("pageStartID", pageStartID), zap.Any("pageEndID", pageEndID)) if pageStartID == pageEndID { pageWriteBatch.Delete([]byte(pageStartKey)) - } else { + } else if pageStartID < pageEndID { pageWriteBatch.DeleteRange([]byte(pageStartKey), []byte(pageEndKey)) } ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), pageWriteBatch) @@ -412,6 +406,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } ri.pageInfo.Store(topic, pageInfo) } + log.Debug("ExpiredCleanUp: ", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize)) // Delete acked_ts in rocksdb_kv fixedAckedTsTitle, err := constructKey(AckedTsTitle, topic) @@ -422,7 +417,9 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { ackedEndIDKey := fixedAckedTsTitle + "/" + strconv.Itoa(int(endID)) ackedTsWriteBatch := gorocksdb.NewWriteBatch() defer ackedTsWriteBatch.Clear() - if startID == endID { + if startID > endID { + return nil + } else if startID == endID { ackedTsWriteBatch.Delete([]byte(ackedStartIDKey)) } else { ackedTsWriteBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey)) @@ -470,7 +467,6 @@ func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) err writeBatch := gorocksdb.NewWriteBatch() defer writeBatch.Clear() - log.Debug("Delete messages by range", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID)) if startID == endID { writeBatch.Delete([]byte(startKey)) } else {