未验证 提交 65269a47 编写于 作者: X Xiaofan 提交者: GitHub

Remove rocksmq reader (#17149)

Signed-off-by: Nxiaofan-luan <xiaofan.luan@zilliz.com>
上级 a20e0dfc
......@@ -39,8 +39,6 @@ type Client interface {
// Create a consumer instance and subscribe a topic
Subscribe(options ConsumerOptions) (Consumer, error)
CreateReader(options ReaderOptions) (Reader, error)
// Close the client and free associated resources
Close()
}
......@@ -177,11 +177,6 @@ func (c *client) deliver(consumer *consumer, batchMax int) {
}
}
func (c *client) CreateReader(readerOptions ReaderOptions) (Reader, error) {
reader, err := newReader(c, &readerOptions)
return reader, err
}
// Close close the channel to notify rocksmq to stop operation and close rocksmq server
func (c *client) Close() {
c.closeOnce.Do(func() {
......
......@@ -128,9 +128,10 @@ func (c *consumer) Seek(id UniqueID) error { //nolint:govet
// Close destroy current consumer in rocksmq
func (c *consumer) Close() {
// TODO should panic?
err := c.client.server.DestroyConsumerGroup(c.topic, c.consumerName)
if err != nil {
log.Debug("Consumer close failed", zap.Any("topicName", c.topic), zap.Any("groupName", c.consumerName), zap.Any("error", err))
log.Warn("Consumer close failed", zap.Any("topicName", c.topic), zap.Any("groupName", c.consumerName), zap.Any("error", err))
}
}
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package client
import (
"context"
)
// ReaderMessage package Reader and Message as a struct to use
type ReaderMessage struct {
Reader
Message
}
// ReaderOptions abstraction Reader options to use.
type ReaderOptions struct {
// Topic specify the topic this consumer will subscribe on.
// This argument is required when constructing the reader.
Topic string
// Name set the reader name.
Name string
// Attach a set of application defined properties to the reader
// This properties will be visible in the topic stats
Properties map[string]string
// StartMessageID initial reader positioning is done by specifying a message id. The options are:
// * `MessageID` : Start reading from a particular message id, the reader will position itself on that
// specific position. The first message to be read will be the message next to the specified
// messageID
StartMessageID UniqueID
// If true, the reader will start at the `StartMessageID`, included.
// Default is `false` and the reader will start from the "next" message
StartMessageIDInclusive bool
// SubscriptionRolePrefix set the subscription role prefix. The default prefix is "reader".
SubscriptionRolePrefix string
}
// Reader can be used to scan through all the messages currently available in a topic.
type Reader interface {
// Topic from which this reader is reading from
Topic() string
// Next read the next message in the topic, blocking until a message is available
Next(context.Context) (Message, error)
// HasNext check if there is any message available to read from the current position
HasNext() bool
// Close the reader and stop the broker to push more messages
Close()
// Reset the subscription associated with this reader to a specific message id.
Seek(UniqueID) error //nolint:govet
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package client
import (
"context"
)
// reader contains main options for rocksmq, and can only be set when newReader
type reader struct {
c *client
topic string
name string
startMessageID UniqueID
startMessageIDInclusive bool
subscriptionRolePrefix string
}
// newReader create a rocksmq reader from reader options
func newReader(c *client, readerOptions *ReaderOptions) (*reader, error) {
if c == nil {
return nil, newError(InvalidConfiguration, "client is nil")
}
if readerOptions == nil {
return nil, newError(InvalidConfiguration, "options is nil")
}
if readerOptions.Topic == "" {
return nil, newError(InvalidConfiguration, "topic is empty")
}
reader := &reader{
c: c,
topic: readerOptions.Topic,
name: readerOptions.Name,
startMessageID: readerOptions.StartMessageID,
startMessageIDInclusive: readerOptions.StartMessageIDInclusive,
subscriptionRolePrefix: readerOptions.SubscriptionRolePrefix,
}
if c.server == nil {
return nil, newError(InvalidConfiguration, "rmq server in client is nil")
}
name, err := c.server.CreateReader(readerOptions.Topic, reader.startMessageID, reader.startMessageIDInclusive, reader.subscriptionRolePrefix)
if err != nil {
return nil, err
}
reader.name = name
return reader, nil
}
//Topic return the topic name of the reader
func (r *reader) Topic() string {
return r.topic
}
// Next return the next message of reader, blocking until a message is available
func (r *reader) Next(ctx context.Context) (Message, error) {
cMsg, err := r.c.server.Next(ctx, r.topic, r.name)
if err != nil {
return Message{}, err
}
msg := Message{
MsgID: cMsg.MsgID,
Payload: cMsg.Payload,
Topic: r.topic,
}
return msg, nil
}
// HasNext check if there is a message available to read
func (r *reader) HasNext() bool {
return r.c.server.HasNext(r.topic, r.name)
}
// Close close the reader and stop the blocking reader
func (r *reader) Close() {
r.c.server.CloseReader(r.topic, r.name)
}
// Seek seek the reader to the position of message id
func (r *reader) Seek(msgID UniqueID) error { //nolint:govet
r.c.server.ReaderSeek(r.topic, r.name, msgID)
return nil
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package client
import (
"context"
"os"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_NewReader(t *testing.T) {
reader, err := newReader(nil, nil)
assert.Error(t, err)
assert.Nil(t, reader)
reader, err = newReader(newMockClient(), nil)
assert.Error(t, err)
assert.Nil(t, reader)
options := &ReaderOptions{}
reader, err = newReader(newMockClient(), options)
assert.Error(t, err)
assert.Nil(t, reader)
options.Topic = newTopicName()
reader, err = newReader(newMockClient(), options)
assert.Error(t, err)
assert.Nil(t, reader)
}
func TestReader_Next(t *testing.T) {
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/test_reader"
rmq := newRocksMQ(t, rmqPathTest)
defer removePath(rmqPath)
client, err := newClient(Options{
Server: rmq,
})
assert.NoError(t, err)
assert.NotNil(t, client)
defer client.Close()
topicName := newTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.NotNil(t, producer)
assert.NoError(t, err)
msgNum := 10
ids := make([]UniqueID, 0)
for i := 0; i < msgNum; i++ {
msg := &ProducerMessage{
Payload: []byte("message_" + strconv.FormatInt(int64(i), 10)),
}
id, err := producer.Send(msg)
assert.NoError(t, err)
ids = append(ids, id)
}
reader1, err := newReader(client, &ReaderOptions{
Topic: topicName,
StartMessageIDInclusive: true,
SubscriptionRolePrefix: "reder1",
})
assert.NoError(t, err)
assert.NotNil(t, reader1)
assert.Equal(t, reader1.Topic(), topicName)
defer reader1.Close()
reader1.Seek(ids[1])
ctx := context.Background()
for i := 1; i < msgNum; i++ {
assert.True(t, reader1.HasNext())
rMsg, err := reader1.Next(ctx)
assert.NoError(t, err)
assert.Equal(t, rMsg.MsgID, ids[i])
}
assert.False(t, reader1.HasNext())
reader2, err := newReader(client, &ReaderOptions{
Topic: topicName,
StartMessageIDInclusive: false,
SubscriptionRolePrefix: "reader2",
})
assert.NoError(t, err)
reader2.Seek(ids[5])
for i := 5; i < msgNum-1; i++ {
assert.True(t, reader2.HasNext())
rMsg, err := reader2.Next(ctx)
assert.NoError(t, err)
assert.Equal(t, rMsg.MsgID, ids[i+1])
}
assert.False(t, reader2.HasNext())
}
......@@ -11,8 +11,6 @@
package server
import "context"
// ProducerMessage that will be written to rocksdb
type ProducerMessage struct {
Payload []byte
......@@ -51,10 +49,4 @@ type RocksMQ interface {
ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer, error)
Notify(topicName, groupName string)
CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool, subscriptionRolePrefix string) (string, error)
ReaderSeek(topicName string, readerName string, msgID UniqueID) error
Next(ctx context.Context, topicName string, readerName string) (*ConsumerMessage, error)
HasNext(topicName string, readerName string) bool
CloseReader(topicName string, readerName string)
}
......@@ -12,7 +12,6 @@
package server
import (
"context"
"errors"
"fmt"
"math"
......@@ -289,7 +288,7 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
// Check if topicName contains "/"
if strings.Contains(topicName, "/") {
log.Error("rocksmq failed to create topic for topic name contains \"/\"", zap.String("topic", topicName))
log.Warn("rocksmq failed to create topic for topic name contains \"/\"", zap.String("topic", topicName))
return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName))
}
......@@ -300,7 +299,7 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
return err
}
if val != "" {
log.Debug("rocksmq topic already exists ", zap.String("topic", topicName))
log.Warn("rocksmq topic already exists ", zap.String("topic", topicName))
return nil
}
......@@ -390,12 +389,6 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
topicMu.Delete(topicName)
rmq.retentionInfo.topicRetetionTime.Delete(topicName)
// clean up reader
if val, ok := rmq.readers.LoadAndDelete(topicName); ok {
for _, reader := range val.([]*rocksmqReader) {
reader.Close()
}
}
log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
return nil
}
......@@ -533,7 +526,6 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen))
if err != nil {
log.Error("RocksMQ: alloc id failed.", zap.Error(err))
return []UniqueID{}, err
}
allocTime := time.Since(start).Milliseconds()
......@@ -558,7 +550,6 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
defer opts.Destroy()
err = rmq.store.Write(opts, batch)
if err != nil {
log.Debug("RocksMQ: write batch failed")
return []UniqueID{}, err
}
writeTime := time.Since(start).Milliseconds()
......@@ -573,16 +564,6 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
}
}
// Notify reader
if val, ok := rmq.readers.Load(topicName); ok {
for _, reader := range val.([]*rocksmqReader) {
select {
case reader.readerMutex <- struct{}{}:
default:
}
}
}
// Update message page info
err = rmq.updatePageInfo(topicName, msgIDs, msgSizes)
if err != nil {
......@@ -688,7 +669,6 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
offset++
msgID, err := strconv.ParseInt(strKey[len(topicName)+1:], 10, 64)
if err != nil {
log.Warn("RocksMQ: parse int " + strKey[len(topicName)+1:] + " failed")
val.Free()
return nil, err
}
......@@ -751,7 +731,6 @@ func (rmq *rocksmq) seek(topicName string, groupName string, msgID UniqueID) err
key := constructCurrentID(topicName, groupName)
_, ok := rmq.consumersID.Load(key)
if !ok {
log.Warn("RocksMQ: channel " + key + " not exists")
return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName)
}
......@@ -760,7 +739,6 @@ func (rmq *rocksmq) seek(topicName string, groupName string, msgID UniqueID) err
defer opts.Destroy()
val, err := rmq.store.Get(opts, []byte(storeKey))
if err != nil {
log.Warn("RocksMQ: get " + storeKey + " failed")
return err
}
defer val.Free()
......@@ -800,7 +778,6 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err
err := rmq.seek(topicName, groupName, msgID)
if err != nil {
log.Debug("failed to seek", zap.String("topic", topicName), zap.String("group", groupName), zap.Uint64("msgId", uint64(msgID)), zap.Error(err))
return err
}
log.Debug("successfully seek", zap.String("topic", topicName), zap.String("group", groupName), zap.Uint64("msgId", uint64(msgID)))
......@@ -817,7 +794,6 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
key := constructCurrentID(topicName, groupName)
_, ok := rmq.consumersID.Load(key)
if !ok {
log.Warn("RocksMQ: channel " + key + " not exists")
return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName)
}
......@@ -968,127 +944,3 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID)
}
return nil
}
// CreateReader create a reader for topic and generate reader name
func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool, subscriptionRolePrefix string) (string, error) {
if rmq.isClosed() {
return "", errors.New(RmqNotServingErrMsg)
}
if _, ok := topicMu.Load(topicName); !ok {
return "", fmt.Errorf("topic=%s not exist", topicName)
}
readOpts := gorocksdb.NewDefaultReadOptions()
readOpts.SetPrefixSameAsStart(true)
iter := rocksdbkv.NewRocksIteratorWithUpperBound(rmq.store, typeutil.AddOne(topicName+"/"), readOpts)
dataKey := path.Join(topicName, strconv.FormatInt(startMsgID, 10))
iter.Seek([]byte(dataKey))
// if iterate fail
if err := iter.Err(); err != nil {
return "", err
}
nowTs, err := getNowTs(rmq.idAllocator)
if err != nil {
return "", errors.New("Can't get current ts from rocksmq idAllocator")
}
readerName := subscriptionRolePrefix + ReaderNamePrefix + strconv.FormatInt(nowTs, 10)
reader := &rocksmqReader{
store: rmq.store,
topic: topicName,
readerName: readerName,
readOpts: readOpts,
iter: iter,
currentID: startMsgID,
messageIDInclusive: messageIDInclusive,
readerMutex: make(chan struct{}, 1),
}
if vals, ok := rmq.readers.Load(topicName); ok {
readers := vals.([]*rocksmqReader)
readers = append(readers, reader)
rmq.readers.Store(topicName, readers)
} else {
readers := make([]*rocksmqReader, 1)
readers[0] = reader
rmq.readers.Store(topicName, readers)
}
return readerName, nil
}
func (rmq *rocksmq) getReader(topicName, readerName string) *rocksmqReader {
if vals, ok := rmq.readers.Load(topicName); ok {
for _, v := range vals.([]*rocksmqReader) {
if v.readerName == readerName {
return v
}
}
}
return nil
}
func (rmq *rocksmq) getAndDeleteReader(topicName, readerName string) *rocksmqReader {
if vals, ok := rmq.readers.Load(topicName); ok {
readers := vals.([]*rocksmqReader)
for i, v := range vals.([]*rocksmqReader) {
if v.readerName == readerName {
readers[i] = readers[len(readers)-1]
rmq.readers.Store(topicName, readers[:len(readers)-1])
return v
}
}
}
return nil
}
// ReaderSeek seek a reader to the pointed position
func (rmq *rocksmq) ReaderSeek(topicName string, readerName string, msgID UniqueID) error {
if rmq.isClosed() {
return errors.New(RmqNotServingErrMsg)
}
reader := rmq.getReader(topicName, readerName)
if reader == nil {
log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName))
return fmt.Errorf("reader not exist, topic %s, reader %s", topicName, readerName)
}
reader.Seek(msgID)
return nil
}
// Next get the next message of reader
func (rmq *rocksmq) Next(ctx context.Context, topicName string, readerName string) (*ConsumerMessage, error) {
if rmq.isClosed() {
return nil, errors.New(RmqNotServingErrMsg)
}
reader := rmq.getReader(topicName, readerName)
if reader == nil {
return nil, fmt.Errorf("reader of %s doesn't exist", topicName)
}
return reader.Next(ctx)
}
// HasNext judge whether reader has next message
func (rmq *rocksmq) HasNext(topicName string, readerName string) bool {
if rmq.isClosed() {
return false
}
reader := rmq.getReader(topicName, readerName)
if reader == nil {
log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName))
return false
}
return reader.HasNext()
}
// CloseReader close a reader
func (rmq *rocksmq) CloseReader(topicName string, readerName string) {
if rmq.isClosed() {
return
}
reader := rmq.getAndDeleteReader(topicName, readerName)
if reader == nil {
log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName))
return
}
reader.Close()
reader = nil
}
......@@ -12,7 +12,6 @@
package server
import (
"context"
"fmt"
"log"
"os"
......@@ -726,128 +725,6 @@ func TestRocksmq_SeekToLatest(t *testing.T) {
}
}
func TestRocksmq_Reader(t *testing.T) {
ep := etcdEndpoints()
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
assert.Nil(t, err)
defer etcdCli.Close()
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
assert.Nil(t, err)
defer etcdKV.Close()
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq_reader"
defer os.RemoveAll(name)
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
channelName := newChanName()
_, err = rmq.CreateReader(channelName, 0, true, "")
assert.Error(t, err)
err = rmq.CreateTopic(channelName)
assert.Nil(t, err)
defer rmq.DestroyTopic(channelName)
loopNum := 100
pMsgs := make([]ProducerMessage, loopNum)
for i := 0; i < loopNum; i++ {
msg := "message_" + strconv.Itoa(i+loopNum)
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
ids, err := rmq.Produce(channelName, pMsgs)
assert.Nil(t, err)
assert.Equal(t, len(ids), loopNum)
readerName1, err := rmq.CreateReader(channelName, ids[0], true, "test-reader-true")
assert.NoError(t, err)
rmq.ReaderSeek(channelName, readerName1, ids[0])
ctx := context.Background()
for i := 0; i < loopNum; i++ {
assert.Equal(t, true, rmq.HasNext(channelName, readerName1))
msg, err := rmq.Next(ctx, channelName, readerName1)
assert.NoError(t, err)
assert.Equal(t, msg.MsgID, ids[i])
}
assert.False(t, rmq.HasNext(channelName, readerName1))
readerName2, err := rmq.CreateReader(channelName, ids[0], false, "test-reader-false")
assert.NoError(t, err)
rmq.ReaderSeek(channelName, readerName2, ids[0])
for i := 0; i < loopNum-1; i++ {
assert.Equal(t, true, rmq.HasNext(channelName, readerName2))
msg, err := rmq.Next(ctx, channelName, readerName2)
assert.NoError(t, err)
assert.Equal(t, msg.MsgID, ids[i+1])
}
assert.False(t, rmq.HasNext(channelName, readerName2))
}
func TestReader_CornerCase(t *testing.T) {
ep := etcdEndpoints()
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
assert.Nil(t, err)
defer etcdCli.Close()
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
defer etcdKV.Close()
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq_reader_cornercase"
defer os.RemoveAll(name)
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
channelName := newChanName()
err = rmq.CreateTopic(channelName)
assert.Nil(t, err)
defer rmq.DestroyTopic(channelName)
loopNum := 10
pMsgs := make([]ProducerMessage, loopNum)
for i := 0; i < loopNum; i++ {
msg := "message_" + strconv.Itoa(i+loopNum)
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs[i] = pMsg
}
ids, err := rmq.Produce(channelName, pMsgs)
assert.Nil(t, err)
assert.Equal(t, len(ids), loopNum)
readerName, err := rmq.CreateReader(channelName, ids[loopNum-1], true, "cornercase")
assert.NoError(t, err)
ctx := context.Background()
msg, err := rmq.Next(ctx, channelName, readerName)
assert.NoError(t, err)
assert.Equal(t, msg.MsgID, ids[loopNum-1])
var extraIds []UniqueID
go func() {
time.Sleep(1 * time.Second)
extraMsgs := make([]ProducerMessage, 1)
msg := "extra_message"
extraMsgs[0] = ProducerMessage{Payload: []byte(msg)}
extraIds, _ = rmq.Produce(channelName, extraMsgs)
// assert.NoError(t, er)
assert.Equal(t, 1, len(extraIds))
}()
msg, err = rmq.Next(ctx, channelName, readerName)
assert.NoError(t, err)
assert.Equal(t, string(msg.Payload), "extra_message")
}
func TestRocksmq_GetLatestMsg(t *testing.T) {
ep := etcdEndpoints()
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
......@@ -953,13 +830,6 @@ func TestRocksmq_Close(t *testing.T) {
assert.Error(t, rmq.seek("", "", 0))
assert.Error(t, rmq.SeekToLatest("", ""))
_, err = rmq.CreateReader("", 0, false, "")
assert.Error(t, err)
rmq.ReaderSeek("", "", 0)
_, err = rmq.Next(nil, "", "")
assert.Error(t, err)
rmq.HasNext("", "")
rmq.CloseReader("", "")
}
func TestRocksmq_SeekWithNoConsumerError(t *testing.T) {
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package server
import (
"context"
"errors"
"fmt"
"path"
"strconv"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/tecbot/gorocksdb"
)
type rocksmqReader struct {
store *gorocksdb.DB
topic string
readerName string
readOpts *gorocksdb.ReadOptions
iter *rocksdbkv.RocksIterator
currentID UniqueID
messageIDInclusive bool
readerMutex chan struct{}
}
//Seek seek the rocksmq reader to the pointed position
func (rr *rocksmqReader) Seek(msgID UniqueID) { //nolint:govet
rr.currentID = msgID
dataKey := path.Join(rr.topic, strconv.FormatInt(msgID, 10))
rr.iter.Seek([]byte(dataKey))
if !rr.messageIDInclusive {
rr.currentID++
rr.iter.Next()
}
}
func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) {
var err error
iter := rr.iter
var msg *ConsumerMessage
getMsg := func() {
key := iter.Key()
val := iter.Value()
tmpKey := string(key.Data())
if key != nil {
key.Free()
}
var msgID UniqueID
msgID, err = strconv.ParseInt(tmpKey[len(rr.topic)+1:], 10, 64)
msg = &ConsumerMessage{
MsgID: msgID,
}
origData := val.Data()
dataLen := len(origData)
if dataLen > 0 {
msg.Payload = make([]byte, dataLen)
copy(msg.Payload, origData)
}
if val != nil {
val.Free()
}
iter.Next()
rr.currentID = msgID
}
if iter.Valid() {
getMsg()
return msg, err
}
// TODO this is the same logic as pulsar reader, but do we really need to read till the end of the stream
select {
case <-ctx.Done():
log.Debug("Stop get next reader message!")
return nil, ctx.Err()
case _, ok := <-rr.readerMutex:
if !ok {
log.Warn("reader Mutex closed")
return nil, fmt.Errorf("reader Mutex closed")
}
rr.iter.Close()
rr.iter = rocksdbkv.NewRocksIteratorWithUpperBound(rr.store, typeutil.AddOne(rr.topic+"/"), rr.readOpts)
dataKey := path.Join(rr.topic, strconv.FormatInt(rr.currentID+1, 10))
iter = rr.iter
iter.Seek([]byte(dataKey))
if !iter.Valid() {
return nil, errors.New("reader iterater is still invalid after receive mutex")
}
getMsg()
return msg, err
}
}
func (rr *rocksmqReader) HasNext() bool {
if rr.iter.Valid() {
return true
}
select {
case _, ok := <-rr.readerMutex:
if !ok {
return false
}
rr.iter.Close()
rr.iter = rocksdbkv.NewRocksIteratorWithUpperBound(rr.store, typeutil.AddOne(rr.topic+"/"), rr.readOpts)
dataKey := path.Join(rr.topic, strconv.FormatInt(rr.currentID+1, 10))
rr.iter.Seek([]byte(dataKey))
return rr.iter.Valid()
default:
return false
}
}
func (rr *rocksmqReader) Close() {
close(rr.readerMutex)
rr.iter.Close()
rr.readOpts.Destroy()
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册