提交 e3fadc45 编写于 作者: Y yukun 提交者: yefu.chen

Fix for new msgstream interface

Signed-off-by: Nyukun <kun.yu@zilliz.com>
上级 b89e5a32
...@@ -183,7 +183,7 @@ func (meta *meta) UpdateSegment(segmentInfo *datapb.SegmentInfo) error { ...@@ -183,7 +183,7 @@ func (meta *meta) UpdateSegment(segmentInfo *datapb.SegmentInfo) error {
func (meta *meta) DropSegment(segmentID UniqueID) error { func (meta *meta) DropSegment(segmentID UniqueID) error {
meta.ddLock.Lock() meta.ddLock.Lock()
meta.ddLock.Unlock() defer meta.ddLock.Unlock()
if _, ok := meta.segID2Info[segmentID]; !ok { if _, ok := meta.segID2Info[segmentID]; !ok {
return newErrSegmentNotFound(segmentID) return newErrSegmentNotFound(segmentID)
......
...@@ -71,7 +71,7 @@ type ( ...@@ -71,7 +71,7 @@ type (
} }
) )
func newSegmentAllocator(meta *meta, allocator allocator) (*segmentAllocatorImpl, error) { func newSegmentAllocator(meta *meta, allocator allocator) *segmentAllocatorImpl {
segmentAllocator := &segmentAllocatorImpl{ segmentAllocator := &segmentAllocatorImpl{
mt: meta, mt: meta,
segments: make(map[UniqueID]*segmentStatus), segments: make(map[UniqueID]*segmentStatus),
...@@ -80,7 +80,7 @@ func newSegmentAllocator(meta *meta, allocator allocator) (*segmentAllocatorImpl ...@@ -80,7 +80,7 @@ func newSegmentAllocator(meta *meta, allocator allocator) (*segmentAllocatorImpl
segmentThresholdFactor: Params.SegmentSizeFactor, segmentThresholdFactor: Params.SegmentSizeFactor,
allocator: allocator, allocator: allocator,
} }
return segmentAllocator, nil return segmentAllocator
} }
func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentInfo) error { func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentInfo) error {
......
...@@ -17,8 +17,7 @@ func TestAllocSegment(t *testing.T) { ...@@ -17,8 +17,7 @@ func TestAllocSegment(t *testing.T) {
mockAllocator := newMockAllocator() mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator) meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err) assert.Nil(t, err)
segAllocator, err := newSegmentAllocator(meta, mockAllocator) segAllocator := newSegmentAllocator(meta, mockAllocator)
assert.Nil(t, err)
schema := newTestSchema() schema := newTestSchema()
collID, err := mockAllocator.allocID() collID, err := mockAllocator.allocID()
...@@ -68,8 +67,7 @@ func TestSealSegment(t *testing.T) { ...@@ -68,8 +67,7 @@ func TestSealSegment(t *testing.T) {
mockAllocator := newMockAllocator() mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator) meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err) assert.Nil(t, err)
segAllocator, err := newSegmentAllocator(meta, mockAllocator) segAllocator := newSegmentAllocator(meta, mockAllocator)
assert.Nil(t, err)
schema := newTestSchema() schema := newTestSchema()
collID, err := mockAllocator.allocID() collID, err := mockAllocator.allocID()
...@@ -105,8 +103,7 @@ func TestExpireSegment(t *testing.T) { ...@@ -105,8 +103,7 @@ func TestExpireSegment(t *testing.T) {
mockAllocator := newMockAllocator() mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator) meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err) assert.Nil(t, err)
segAllocator, err := newSegmentAllocator(meta, mockAllocator) segAllocator := newSegmentAllocator(meta, mockAllocator)
assert.Nil(t, err)
schema := newTestSchema() schema := newTestSchema()
collID, err := mockAllocator.allocID() collID, err := mockAllocator.allocID()
......
...@@ -134,10 +134,7 @@ func (s *Server) Start() error { ...@@ -134,10 +134,7 @@ func (s *Server) Start() error {
return err return err
} }
s.statsHandler = newStatsHandler(s.meta) s.statsHandler = newStatsHandler(s.meta)
s.segAllocator, err = newSegmentAllocator(s.meta, s.allocator) s.segAllocator = newSegmentAllocator(s.meta, s.allocator)
if err != nil {
return err
}
s.ddHandler = newDDHandler(s.meta, s.segAllocator) s.ddHandler = newDDHandler(s.meta, s.segAllocator)
s.initSegmentInfoChannel() s.initSegmentInfoChannel()
if err = s.loadMetaFromMaster(); err != nil { if err = s.loadMetaFromMaster(); err != nil {
......
...@@ -21,7 +21,7 @@ func TestDataNodeTTWatcher(t *testing.T) { ...@@ -21,7 +21,7 @@ func TestDataNodeTTWatcher(t *testing.T) {
allocator := newMockAllocator() allocator := newMockAllocator()
meta, err := newMemoryMeta(allocator) meta, err := newMemoryMeta(allocator)
assert.Nil(t, err) assert.Nil(t, err)
segAllocator, err := newSegmentAllocator(meta, allocator) segAllocator := newSegmentAllocator(meta, allocator)
assert.Nil(t, err) assert.Nil(t, err)
watcher := newDataNodeTimeTickWatcher(meta, segAllocator, cluster) watcher := newDataNodeTimeTickWatcher(meta, segAllocator, cluster)
......
...@@ -747,7 +747,7 @@ func checkTimeTickMsg(msg map[Consumer]Timestamp, ...@@ -747,7 +747,7 @@ func checkTimeTickMsg(msg map[Consumer]Timestamp,
for consumer := range msg { for consumer := range msg {
mu.RLock() mu.RLock()
v := msg[consumer] v := msg[consumer]
mu.Unlock() mu.RUnlock()
if v != maxTime { if v != maxTime {
isChannelReady[consumer] = false isChannelReady[consumer] = false
} else { } else {
......
package rmqms
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
)
type Factory struct {
dispatcherFactory msgstream.ProtoUDFactory
address string
receiveBufSize int64
pulsarBufSize int64
}
func (f *Factory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
return newRmqMsgStream(ctx, f.receiveBufSize, f.dispatcherFactory.NewUnmarshalDispatcher())
}
func NewFactory(address string, receiveBufSize int64, pulsarBufSize int64) *Factory {
f := &Factory{
dispatcherFactory: msgstream.ProtoUDFactory{},
address: address,
receiveBufSize: receiveBufSize,
pulsarBufSize: pulsarBufSize,
}
return f
}
...@@ -16,6 +16,17 @@ import ( ...@@ -16,6 +16,17 @@ import (
"github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream"
) )
type TsMsg = msgstream.TsMsg
type MsgPack = msgstream.MsgPack
type MsgType = msgstream.MsgType
type UniqueID = msgstream.UniqueID
type BaseMsg = msgstream.BaseMsg
type Timestamp = msgstream.Timestamp
type IntPrimaryKey = msgstream.IntPrimaryKey
type TimeTickMsg = msgstream.TimeTickMsg
type QueryNodeStatsMsg = msgstream.QueryNodeStatsMsg
type RepackFunc = msgstream.RepackFunc
type RmqMsgStream struct { type RmqMsgStream struct {
isServing int64 isServing int64
ctx context.Context ctx context.Context
...@@ -23,7 +34,6 @@ type RmqMsgStream struct { ...@@ -23,7 +34,6 @@ type RmqMsgStream struct {
serverLoopCtx context.Context serverLoopCtx context.Context
serverLoopCancel func() serverLoopCancel func()
rmq *rocksmq.RocksMQ
repackFunc msgstream.RepackFunc repackFunc msgstream.RepackFunc
consumers []rocksmq.Consumer consumers []rocksmq.Consumer
producers []string producers []string
...@@ -35,17 +45,18 @@ type RmqMsgStream struct { ...@@ -35,17 +45,18 @@ type RmqMsgStream struct {
streamCancel func() streamCancel func()
} }
func NewRmqMsgStream(ctx context.Context, rmq *rocksmq.RocksMQ, receiveBufSize int64) *RmqMsgStream { func newRmqMsgStream(ctx context.Context, receiveBufSize int64,
unmarshal msgstream.UnmarshalDispatcher) (*RmqMsgStream, error) {
streamCtx, streamCancel := context.WithCancel(ctx) streamCtx, streamCancel := context.WithCancel(ctx)
receiveBuf := make(chan *msgstream.MsgPack, receiveBufSize) receiveBuf := make(chan *msgstream.MsgPack, receiveBufSize)
stream := &RmqMsgStream{ stream := &RmqMsgStream{
ctx: streamCtx, ctx: streamCtx,
rmq: nil,
receiveBuf: receiveBuf, receiveBuf: receiveBuf,
unmarshal: unmarshal,
streamCancel: streamCancel, streamCancel: streamCancel,
} }
return stream return stream, nil
} }
func (ms *RmqMsgStream) Start() { func (ms *RmqMsgStream) Start() {
...@@ -59,25 +70,32 @@ func (ms *RmqMsgStream) Start() { ...@@ -59,25 +70,32 @@ func (ms *RmqMsgStream) Start() {
func (ms *RmqMsgStream) Close() { func (ms *RmqMsgStream) Close() {
} }
func (ms *RmqMsgStream) CreateProducers(channels []string) error { type propertiesReaderWriter struct {
ppMap map[string]string
}
func (ms *RmqMsgStream) SetRepackFunc(repackFunc RepackFunc) {
ms.repackFunc = repackFunc
}
func (ms *RmqMsgStream) AsProducer(channels []string) {
for _, channel := range channels { for _, channel := range channels {
// TODO(yhz): Here may allow to create an existing channel // TODO(yhz): Here may allow to create an existing channel
if err := ms.rmq.CreateChannel(channel); err != nil { if err := rocksmq.Rmq.CreateChannel(channel); err != nil {
return err errMsg := "Failed to create producer " + channel + ", error = " + err.Error()
panic(errMsg)
} }
} }
return nil
} }
func (ms *RmqMsgStream) CreateConsumers(channels []string, groupName string) error { func (ms *RmqMsgStream) AsConsumer(channels []string, groupName string) {
for _, channelName := range channels { for _, channelName := range channels {
if err := ms.rmq.CreateConsumerGroup(groupName, channelName); err != nil { if err := rocksmq.Rmq.CreateConsumerGroup(groupName, channelName); err != nil {
return err panic(err.Error())
} }
msgNum := make(chan int) msgNum := make(chan int)
ms.consumers = append(ms.consumers, rocksmq.Consumer{GroupName: groupName, ChannelName: channelName, MsgNum: msgNum}) ms.consumers = append(ms.consumers, rocksmq.Consumer{GroupName: groupName, ChannelName: channelName, MsgNum: msgNum})
} }
return nil
} }
func (ms *RmqMsgStream) Produce(pack *msgstream.MsgPack) error { func (ms *RmqMsgStream) Produce(pack *msgstream.MsgPack) error {
...@@ -172,7 +190,30 @@ func (ms *RmqMsgStream) Produce(pack *msgstream.MsgPack) error { ...@@ -172,7 +190,30 @@ func (ms *RmqMsgStream) Produce(pack *msgstream.MsgPack) error {
} }
msg := make([]rocksmq.ProducerMessage, 0) msg := make([]rocksmq.ProducerMessage, 0)
msg = append(msg, *rocksmq.NewProducerMessage(m)) msg = append(msg, *rocksmq.NewProducerMessage(m))
if err := ms.rmq.Produce(ms.producers[k], msg); err != nil { if err := rocksmq.Rmq.Produce(ms.producers[k], msg); err != nil {
return err
}
}
}
return nil
}
func (ms *RmqMsgStream) Broadcast(msgPack *MsgPack) error {
producerLen := len(ms.producers)
for _, v := range msgPack.Msgs {
mb, err := v.Marshal(v)
if err != nil {
return err
}
m, err := msgstream.ConvertToByteArray(mb)
if err != nil {
return err
}
msg := make([]rocksmq.ProducerMessage, 0)
msg = append(msg, *rocksmq.NewProducerMessage(m))
for i := 0; i < producerLen; i++ {
if err := rocksmq.Rmq.Produce(ms.producers[i], msg); err != nil {
return err return err
} }
} }
...@@ -221,7 +262,7 @@ func (ms *RmqMsgStream) bufMsgPackToChannel() { ...@@ -221,7 +262,7 @@ func (ms *RmqMsgStream) bufMsgPackToChannel() {
} }
msgNum := value.Interface().(int) msgNum := value.Interface().(int)
rmqMsg, err := ms.rmq.Consume(ms.consumers[chosen].GroupName, ms.consumers[chosen].ChannelName, msgNum) rmqMsg, err := rocksmq.Rmq.Consume(ms.consumers[chosen].GroupName, ms.consumers[chosen].ChannelName, msgNum)
if err != nil { if err != nil {
log.Printf("Failed to consume message in rocksmq, error = %v", err) log.Printf("Failed to consume message in rocksmq, error = %v", err)
continue continue
...@@ -261,5 +302,23 @@ func (ms *RmqMsgStream) bufMsgPackToChannel() { ...@@ -261,5 +302,23 @@ func (ms *RmqMsgStream) bufMsgPackToChannel() {
} }
func (ms *RmqMsgStream) Chan() <-chan *msgstream.MsgPack { func (ms *RmqMsgStream) Chan() <-chan *msgstream.MsgPack {
return nil return ms.receiveBuf
}
func (ms *RmqMsgStream) Seek(offset *msgstream.MsgPosition) error {
for i := 0; i < len(ms.consumers); i++ {
if ms.consumers[i].ChannelName == offset.ChannelName {
messageID, err := strconv.ParseInt(offset.MsgID, 10, 64)
if err != nil {
return err
}
err = rocksmq.Rmq.Seek(ms.consumers[i].GroupName, ms.consumers[i].ChannelName, messageID)
if err != nil {
return err
}
return nil
}
}
return errors.New("msgStream seek fail")
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册