diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go index 376e86bf711ade6807874c385e243b2ed344b940..bec1f3490a926617b8c446ab5ed958db881bd8cc 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go @@ -539,15 +539,15 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) { assert.Equal(t, len(cMsgs), msgNum) assert.Equal(t, cMsgs[0].MsgID, ids2[0]) - time.Sleep(time.Duration(3) * time.Second) - - err = rmq.ForceSeek(topicName, groupName, ids[10]) - assert.Nil(t, err) - newRes, err := rmq.Consume(topicName, groupName, 1) - assert.Nil(t, err) - assert.Equal(t, len(newRes), 1) - // point to first not consumed messages - assert.Equal(t, newRes[0].MsgID, ids2[0]) + assert.Eventually(t, func() bool { + err = rmq.ForceSeek(topicName, groupName, ids[0]) + assert.Nil(t, err) + newRes, err := rmq.Consume(topicName, groupName, 1) + assert.Nil(t, err) + assert.Equal(t, len(newRes), 1) + // point to first not consumed messages + return newRes[0].MsgID == ids2[0] + }, 5*time.Second, 1*time.Second) // test acked size acked ts and other meta are updated as expect msgSizeKey := MessageSizeTitle + topicName