未验证 提交 2147507e 编写于 作者: Y yukun 提交者: GitHub

Add reader implementation in rocksmq (#12137)

Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>
上级 b57d08bd
package mqclient
import (
"context"
"github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq"
)
var _ Reader = (*rmqReader)(nil)
type rmqReader struct {
r rocksmq.Reader
}
func (rr *rmqReader) Topic() string {
return rr.r.Topic()
}
func (rr *rmqReader) Next(ctx context.Context) (Message, error) {
rMsg, err := rr.r.Next(ctx)
if err != nil {
return nil, err
}
msg := &rmqMessage{msg: rMsg}
return msg, nil
}
func (rr *rmqReader) HasNext() bool {
return rr.r.HasNext()
}
func (rr *rmqReader) Seek(id MessageID) error {
msgID := id.(*rmqID).messageID
return rr.r.Seek(msgID)
}
func (rr *rmqReader) Close() {
rr.r.Close()
}
......@@ -39,6 +39,8 @@ type Client interface {
// Create a consumer instance and subscribe a topic
Subscribe(options ConsumerOptions) (Consumer, error)
CreateReader(options ReaderOptions) (Reader, error)
// Close the client and free associated resources
Close()
}
......@@ -158,6 +158,11 @@ func (c *client) consume(consumer *consumer) {
}
}
func (c *client) CreateReader(readerOptions ReaderOptions) (Reader, error) {
reader, err := newReader(c, &readerOptions)
return reader, err
}
// Close close the channel to notify rocksmq to stop operation and close rocksmq server
func (c *client) Close() {
// TODO(yukun): Should call server.close() here?
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package rocksmq
import "context"
type reader struct {
c *client
topic string
name string
startMessageID UniqueID
startMessageIDInclusive bool
}
func newReader(c *client, readerOptions *ReaderOptions) (*reader, error) {
if c == nil {
return nil, newError(InvalidConfiguration, "client is nil")
}
if readerOptions == nil {
return nil, newError(InvalidConfiguration, "options is nil")
}
if readerOptions.Topic == "" {
return nil, newError(InvalidConfiguration, "topic is empty")
}
reader := &reader{
c: c,
topic: readerOptions.Topic,
name: readerOptions.Name,
startMessageID: readerOptions.StartMessageID,
startMessageIDInclusive: readerOptions.StartMessageIDInclusive,
}
if c.server == nil {
return nil, newError(InvalidConfiguration, "rmq server in client is nil")
}
err := c.server.CreateReader(readerOptions.Topic, reader.startMessageID, reader.startMessageIDInclusive)
return reader, err
}
func (r *reader) Topic() string {
return r.topic
}
func (r *reader) Next(ctx context.Context) (Message, error) {
cMsg, err := r.c.server.Next(ctx, r.topic, r.startMessageIDInclusive)
if err != nil {
return Message{}, err
}
msg := Message{
MsgID: cMsg.MsgID,
Payload: cMsg.Payload,
Topic: r.topic,
}
return msg, nil
}
func (r *reader) HasNext() bool {
return r.c.server.HasNext(r.topic, r.startMessageIDInclusive)
}
func (r *reader) Close() {
r.c.server.CloseReader(r.topic)
}
func (r *reader) Seek(msgID UniqueID) error { //nolint:govet
r.c.server.ReaderSeek(r.topic, msgID)
return nil
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package rocksmq
import (
"context"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_NewReader(t *testing.T) {
reader, err := newReader(nil, nil)
assert.Error(t, err)
assert.Nil(t, reader)
reader, err = newReader(newMockClient(), nil)
assert.Error(t, err)
assert.Nil(t, reader)
options := &ReaderOptions{}
reader, err = newReader(newMockClient(), options)
assert.Error(t, err)
assert.Nil(t, reader)
options.Topic = newTopicName()
reader, err = newReader(newMockClient(), options)
assert.Error(t, err)
assert.Nil(t, reader)
}
func TestReader_Next(t *testing.T) {
rmqPath := "/tmp/milvus/test_reader"
rmq := newRocksMQ(rmqPath)
defer removePath(rmqPath)
client, err := newClient(ClientOptions{
Server: rmq,
})
assert.NoError(t, err)
assert.NotNil(t, client)
defer client.Close()
topicName := newTopicName()
reader, err := newReader(client, &ReaderOptions{
Topic: topicName,
StartMessageIDInclusive: true,
})
assert.NoError(t, err)
assert.NotNil(t, reader)
assert.Equal(t, reader.Topic(), topicName)
defer reader.Close()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.NotNil(t, producer)
assert.NoError(t, err)
msgNum := 10
ids := make([]UniqueID, 0)
for i := 0; i < msgNum; i++ {
msg := &ProducerMessage{
Payload: []byte("message_" + strconv.FormatInt(int64(i), 10)),
}
id, err := producer.Send(msg)
assert.NoError(t, err)
ids = append(ids, id)
}
reader.Seek(ids[1])
ctx := context.Background()
for i := 1; i < msgNum; i++ {
assert.True(t, reader.HasNext())
rMsg, err := reader.Next(ctx)
assert.NoError(t, err)
assert.Equal(t, rMsg.MsgID, ids[i])
}
assert.False(t, reader.HasNext())
reader.startMessageIDInclusive = false
reader.Seek(ids[5])
for i := 5; i < msgNum-1; i++ {
assert.True(t, reader.HasNext())
rMsg, err := reader.Next(ctx)
assert.NoError(t, err)
assert.Equal(t, rMsg.MsgID, ids[i+1])
}
assert.False(t, reader.HasNext())
}
......@@ -11,6 +11,8 @@
package rocksmq
import "context"
// ProducerMessage that will be write to rocksdb
type ProducerMessage struct {
Payload []byte
......@@ -47,4 +49,10 @@ type RocksMQ interface {
ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer)
Notify(topicName, groupName string)
CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) error
ReaderSeek(topicName string, msgID UniqueID)
Next(ctx context.Context, topicName string, messageIDInclusive bool) (ConsumerMessage, error)
HasNext(topicName string, messageIDInclusive bool) bool
CloseReader(topicName string)
}
......@@ -12,6 +12,7 @@
package rocksmq
import (
"context"
"errors"
"fmt"
"math"
......@@ -125,6 +126,7 @@ type rocksmq struct {
ackedMu sync.Map
retentionInfo *retentionInfo
readers sync.Map
}
// NewRocksMQ step:
......@@ -159,6 +161,7 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro
storeMu: &sync.Mutex{},
consumers: sync.Map{},
ackedMu: sync.Map{},
readers: sync.Map{},
}
ri, err := initRetentionInfo(kv, db)
......@@ -306,6 +309,13 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
// clean up retention info
topicMu.Delete(topicName)
rmq.retentionInfo.topics.Delete(topicName)
// clean up reader
if val, ok := rmq.readers.LoadAndDelete(topicName); ok {
if reader, rOk := val.(*rocksmqReader); rOk {
reader.Close()
}
}
log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
return nil
}
......@@ -490,6 +500,16 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
}
}
// Notify reader
if val, ok := rmq.readers.Load(topicName); ok {
if reader, rOk := val.(*rocksmqReader); rOk {
select {
case reader.readerMutex <- struct{}{}:
default:
}
}
}
// Update message page info
// TODO(yukun): Should this be in a go routine
err = rmq.updatePageInfo(topicName, msgIDs, msgSizes)
......@@ -893,3 +913,66 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID)
}
return nil
}
func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) error {
readOpts := gorocksdb.NewDefaultReadOptions()
readOpts.SetPrefixSameAsStart(true)
iter := rmq.store.NewIterator(readOpts)
fixChanName, err := fixChannelName(topicName)
if err != nil {
log.Debug("RocksMQ: fixChannelName " + topicName + " failed")
return err
}
dataKey := path.Join(fixChanName, strconv.FormatInt(startMsgID, 10))
iter.Seek([]byte(dataKey))
if !iter.Valid() {
log.Warn("iterator of startMsgID is invalid")
}
reader := &rocksmqReader{
store: rmq.store,
topic: topicName,
readOpts: readOpts,
iter: iter,
currentID: startMsgID,
messageIDInclusive: messageIDInclusive,
readerMutex: make(chan struct{}, 1),
}
rmq.readers.Store(topicName, reader)
return nil
}
func (rmq *rocksmq) ReaderSeek(topicName string, msgID UniqueID) {
if val, ok := rmq.readers.Load(topicName); ok {
if reader, rOk := val.(*rocksmqReader); rOk {
reader.Seek(msgID)
}
}
}
func (rmq *rocksmq) Next(ctx context.Context, topicName string, messageIDInclusive bool) (ConsumerMessage, error) {
if val, ok := rmq.readers.Load(topicName); ok {
if reader, rOk := val.(*rocksmqReader); rOk {
return reader.Next(ctx, messageIDInclusive)
}
}
return ConsumerMessage{}, fmt.Errorf("reader of %s doesn't exist", topicName)
}
func (rmq *rocksmq) HasNext(topicName string, messageIDInclusive bool) bool {
if val, ok := rmq.readers.Load(topicName); ok {
if reader, rOk := val.(*rocksmqReader); rOk {
return reader.HasNext(messageIDInclusive)
}
}
return false
}
func (rmq *rocksmq) CloseReader(topicName string) {
if val, ok := rmq.readers.Load(topicName); ok {
if reader, rOk := val.(*rocksmqReader); rOk {
reader.Close()
}
}
}
......@@ -12,6 +12,8 @@
package rocksmq
import (
"context"
"fmt"
"log"
"os"
"strconv"
......@@ -44,6 +46,14 @@ func InitIDAllocator(kvPath string) *allocator.GlobalIDAllocator {
return idAllocator
}
func newChanName() string {
return fmt.Sprintf("my-chan-%v", time.Now().Nanosecond())
}
func newGroupName() string {
return fmt.Sprintf("my-group-%v", time.Now().Nanosecond())
}
func Test_FixChannelName(t *testing.T) {
name := "abcd"
fixName, err := fixChannelName(name)
......@@ -605,3 +615,59 @@ func TestRocksmq_SeekToLatest(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, len(cMsgs), 0)
}
func TestRocksmq_Reader(t *testing.T) {
ep := etcdEndpoints()
etcdKV, err := etcdkv.NewEtcdKV(ep, "/etcd/test/root")
assert.Nil(t, err)
defer etcdKV.Close()
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq_reader"
defer os.RemoveAll(name)
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
channelName := newChanName()
err = rmq.CreateTopic(channelName)
assert.Nil(t, err)
defer rmq.DestroyTopic(channelName)
loopNum := 100
err = rmq.CreateReader(channelName, 0, true)
assert.NoError(t, err)
pMsgs := make([]ProducerMessage, loopNum)
for i := 0; i < loopNum; i++ {
msg := "message_" + strconv.Itoa(i+loopNum)
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
ids, err := rmq.Produce(channelName, pMsgs)
assert.Nil(t, err)
assert.Equal(t, len(ids), loopNum)
rmq.ReaderSeek(channelName, ids[0])
ctx := context.Background()
for i := 0; i < loopNum; i++ {
assert.Equal(t, true, rmq.HasNext(channelName, true))
msg, err := rmq.Next(ctx, channelName, true)
assert.NoError(t, err)
assert.Equal(t, msg.MsgID, ids[i])
}
assert.False(t, rmq.HasNext(channelName, true))
rmq.ReaderSeek(channelName, ids[0])
for i := 0; i < loopNum-1; i++ {
assert.Equal(t, true, rmq.HasNext(channelName, false))
msg, err := rmq.Next(ctx, channelName, false)
assert.NoError(t, err)
assert.Equal(t, msg.MsgID, ids[i+1])
}
assert.False(t, rmq.HasNext(channelName, false))
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package rocksmq
import (
"context"
"fmt"
"path"
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/tecbot/gorocksdb"
)
type rocksmqReader struct {
store *gorocksdb.DB
topic string
readOpts *gorocksdb.ReadOptions
iter *gorocksdb.Iterator
currentID UniqueID
messageIDInclusive bool
readerMutex chan struct{}
}
func (rr *rocksmqReader) Seek(msgID UniqueID) { //nolint:govet
rr.currentID = msgID
select {
case rr.readerMutex <- struct{}{}:
default:
}
}
func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (ConsumerMessage, error) {
ll, ok := topicMu.Load(rr.topic)
if !ok {
return ConsumerMessage{}, fmt.Errorf("topic name = %s not exist", rr.topic)
}
lock, ok := ll.(*sync.Mutex)
if !ok {
return ConsumerMessage{}, fmt.Errorf("get mutex failed, topic name = %s", rr.topic)
}
lock.Lock()
defer lock.Unlock()
fixChanName, err := fixChannelName(rr.topic)
if err != nil {
log.Debug("RocksMQ: fixChannelName " + rr.topic + " failed")
return ConsumerMessage{}, err
}
readOpts := gorocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
var msg ConsumerMessage
readOpts.SetPrefixSameAsStart(true)
iter := rr.store.NewIterator(readOpts)
defer iter.Close()
for {
select {
case <-ctx.Done():
log.Debug("Stop get next reader message!")
return ConsumerMessage{}, nil
case <-rr.readerMutex:
dataKey := path.Join(fixChanName, strconv.FormatInt(rr.currentID, 10))
if iter.Seek([]byte(dataKey)); !iter.Valid() {
continue
}
if messageIDInclusive {
val, err := rr.store.Get(readOpts, []byte(dataKey))
if err != nil {
return ConsumerMessage{}, err
}
if !val.Exists() {
continue
}
msg = ConsumerMessage{
MsgID: rr.currentID,
}
origData := val.Data()
dataLen := len(origData)
if dataLen == 0 {
msg.Payload = nil
} else {
msg.Payload = make([]byte, dataLen)
copy(msg.Payload, origData)
}
val.Free()
// Update nextID in readerOffset
var nextID UniqueID
iter.Next()
if iter.Valid() {
key := iter.Key()
nextID, err = strconv.ParseInt(string(key.Data())[FixedChannelNameLen+1:], 10, 64)
if key.Exists() {
key.Free()
}
if err != nil {
return ConsumerMessage{}, err
}
rr.readerMutex <- struct{}{}
} else {
nextID = rr.currentID + 1
}
rr.currentID = nextID
} else {
iter.Next()
if iter.Valid() {
key := iter.Key()
tmpKey := string(key.Data())
key.Free()
id, err := strconv.ParseInt(tmpKey[FixedChannelNameLen+1:], 10, 64)
if err != nil {
return ConsumerMessage{}, err
}
val := iter.Value()
msg = ConsumerMessage{
MsgID: id,
}
origData := val.Data()
dataLen := len(origData)
if dataLen == 0 {
msg.Payload = nil
} else {
msg.Payload = make([]byte, dataLen)
copy(msg.Payload, origData)
}
val.Free()
rr.currentID = id
rr.readerMutex <- struct{}{}
}
}
return msg, nil
}
}
}
func (rr *rocksmqReader) HasNext(messageIDInclusive bool) bool {
ll, ok := topicMu.Load(rr.topic)
if !ok {
return false
}
lock, ok := ll.(*sync.Mutex)
if !ok {
return false
}
lock.Lock()
defer lock.Unlock()
fixChanName, err := fixChannelName(rr.topic)
if err != nil {
log.Debug("RocksMQ: fixChannelName " + rr.topic + " failed")
return false
}
readOpts := gorocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
readOpts.SetPrefixSameAsStart(true)
iter := rr.store.NewIterator(readOpts)
defer iter.Close()
dataKey := path.Join(fixChanName, strconv.FormatInt(rr.currentID, 10))
iter.Seek([]byte(dataKey))
if !iter.Valid() {
return false
}
if messageIDInclusive {
return true
}
iter.Next()
return iter.Valid()
}
func (rr *rocksmqReader) Close() {
close(rr.readerMutex)
rr.iter.Close()
rr.readOpts.Destroy()
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册