diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index f70ce3d914aaf25495de14edaba0384bd02b2562..1aa32299543fb939b93b9ea22a631d4df2634cdf 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -32,6 +32,7 @@ type QueryNodeStatsMsg = msgstream.QueryNodeStatsMsg type RepackFunc = msgstream.RepackFunc type Consumer = pulsar.Consumer type Producer = pulsar.Producer +type MessageID = pulsar.MessageID type UnmarshalDispatcher = msgstream.UnmarshalDispatcher type PulsarMsgStream struct { @@ -46,6 +47,8 @@ type PulsarMsgStream struct { wait *sync.WaitGroup streamCancel func() pulsarBufSize int64 + consumerLock *sync.Mutex + consumerReflects []reflect.SelectCase } func newPulsarMsgStream(ctx context.Context, @@ -58,22 +61,30 @@ func newPulsarMsgStream(ctx context.Context, producers := make([]Producer, 0) consumers := make([]Consumer, 0) consumerChannels := make([]string, 0) + consumerReflects := make([]reflect.SelectCase, 0) + receiveBuf := make(chan *MsgPack, receiveBufSize) + + client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address}) + if err != nil { + defer streamCancel() + log.Printf("Set pulsar client failed, error = %v", err) + return nil, err + } + stream := &PulsarMsgStream{ ctx: streamCtx, - streamCancel: streamCancel, + client: client, producers: producers, consumers: consumers, consumerChannels: consumerChannels, unmarshal: unmarshal, pulsarBufSize: pulsarBufSize, + receiveBuf: receiveBuf, + streamCancel: streamCancel, + consumerReflects: consumerReflects, + consumerLock: &sync.Mutex{}, } - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address}) - if err != nil { - log.Printf("Set pulsar client failed, error = %v", err) - return nil, err - } - stream.client = client - stream.receiveBuf = make(chan *MsgPack, receiveBufSize) + return stream, nil } @@ -118,7 +129,14 @@ func (ms *PulsarMsgStream) AsConsumer(channels []string, return errors.New("pulsar is not ready, consumer is nil") } + ms.consumerLock.Lock() ms.consumers = append(ms.consumers, pc) + ms.consumerChannels = append(ms.consumerChannels, channels[i]) + ms.consumerReflects = append(ms.consumerReflects, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(pc.Chan()), + }) + ms.consumerLock.Unlock() return nil } err := util.Retry(10, time.Millisecond*200, fn) @@ -299,12 +317,6 @@ func (ms *PulsarMsgStream) Consume() *MsgPack { func (ms *PulsarMsgStream) bufMsgPackToChannel() { defer ms.wait.Done() - cases := make([]reflect.SelectCase, len(ms.consumers)) - for i := 0; i < len(ms.consumers); i++ { - ch := ms.consumers[i].Chan() - cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} - } - for { select { case <-ms.ctx.Done(): @@ -314,7 +326,9 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { tsMsgList := make([]TsMsg, 0) for { - chosen, value, ok := reflect.Select(cases) + ms.consumerLock.Lock() + chosen, value, ok := reflect.Select(ms.consumerReflects) + ms.consumerLock.Unlock() if !ok { log.Printf("channel closed") return @@ -339,6 +353,11 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { log.Printf("Failed to unmarshal tsMsg, error = %v", err) continue } + + tsMsg.SetPosition(&msgstream.MsgPosition{ + ChannelName: filepath.Base(pulsarMsg.Topic()), + MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()), + }) tsMsgList = append(tsMsgList, tsMsg) noMoreMessage := true @@ -386,6 +405,7 @@ func (ms *PulsarMsgStream) Seek(mp *internalpb2.MsgPosition) error { type PulsarTtMsgStream struct { PulsarMsgStream unsolvedBuf map[Consumer][]TsMsg + unsolvedMutex *sync.Mutex lastTimeStamp Timestamp } @@ -394,27 +414,53 @@ func NewPulsarTtMsgStream(ctx context.Context, receiveBufSize int64, pulsarBufSize int64, unmarshal msgstream.UnmarshalDispatcher) (*PulsarTtMsgStream, error) { - - streamCtx, streamCancel := context.WithCancel(ctx) - pulsarMsgStream := PulsarMsgStream{ - ctx: streamCtx, - streamCancel: streamCancel, - pulsarBufSize: pulsarBufSize, - unmarshal: unmarshal, - } - - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address}) + pulsarMsgStream, err := newPulsarMsgStream(ctx, address, receiveBufSize, pulsarBufSize, unmarshal) if err != nil { - log.Printf("Set pulsar client failed, error = %v", err) return nil, err } - pulsarMsgStream.client = client - pulsarMsgStream.receiveBuf = make(chan *MsgPack, receiveBufSize) + unsolvedBuf := make(map[Consumer][]TsMsg) + return &PulsarTtMsgStream{ - PulsarMsgStream: pulsarMsgStream, + PulsarMsgStream: *pulsarMsgStream, + unsolvedBuf: unsolvedBuf, + unsolvedMutex: &sync.Mutex{}, }, nil } +func (ms *PulsarTtMsgStream) AsConsumer(channels []string, + subName string) { + for i := 0; i < len(channels); i++ { + fn := func() error { + receiveChannel := make(chan pulsar.ConsumerMessage, ms.pulsarBufSize) + pc, err := ms.client.Subscribe(pulsar.ConsumerOptions{ + Topic: channels[i], + SubscriptionName: subName, + Type: pulsar.KeyShared, + SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + MessageChannel: receiveChannel, + }) + if err != nil { + return err + } + if pc == nil { + return errors.New("pulsar is not ready, consumer is nil") + } + + ms.consumerLock.Lock() + ms.consumers = append(ms.consumers, pc) + ms.unsolvedBuf[pc] = make([]TsMsg, 0) + ms.consumerChannels = append(ms.consumerChannels, channels[i]) + ms.consumerLock.Unlock() + return nil + } + err := util.Retry(10, time.Millisecond*200, fn) + if err != nil { + errMsg := "Failed to create consumer " + channels[i] + ", error = " + err.Error() + panic(errMsg) + } + } +} + func (ms *PulsarTtMsgStream) Start() { ms.wait = &sync.WaitGroup{} if ms.consumers != nil { @@ -428,33 +474,32 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() { ms.unsolvedBuf = make(map[Consumer][]TsMsg) isChannelReady := make(map[Consumer]bool) eofMsgTimeStamp := make(map[Consumer]Timestamp) - for _, consumer := range ms.consumers { - ms.unsolvedBuf[consumer] = make([]TsMsg, 0) - } + for { select { case <-ms.ctx.Done(): return default: wg := sync.WaitGroup{} - mu := sync.Mutex{} findMapMutex := sync.RWMutex{} + ms.consumerLock.Lock() for _, consumer := range ms.consumers { if isChannelReady[consumer] { continue } wg.Add(1) - go ms.findTimeTick(consumer, eofMsgTimeStamp, &wg, &mu, &findMapMutex) + go ms.findTimeTick(consumer, eofMsgTimeStamp, &wg, &findMapMutex) } wg.Wait() timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp, isChannelReady, &findMapMutex) + ms.consumerLock.Unlock() if !ok || timeStamp <= ms.lastTimeStamp { //log.Printf("All timeTick's timestamps are inconsistent") continue } - timeTickBuf := make([]TsMsg, 0) msgPositions := make([]*internalpb2.MsgPosition, 0) + ms.unsolvedMutex.Lock() for consumer, msgs := range ms.unsolvedBuf { tempBuffer := make([]TsMsg, 0) var timeTickMsg TsMsg @@ -485,6 +530,7 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() { }) } } + ms.unsolvedMutex.Unlock() msgPack := MsgPack{ BeginTs: ms.lastTimeStamp, @@ -502,7 +548,6 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() { func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer, eofMsgMap map[Consumer]Timestamp, wg *sync.WaitGroup, - mu *sync.Mutex, findMapMutex *sync.RWMutex) { defer wg.Done() for { @@ -519,14 +564,13 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer, headerMsg := commonpb.MsgHeader{} err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) if err != nil { - log.Printf("Failed to unmarshal, error = %v", err) + log.Printf("Failed to unmarshal message header, error = %v", err) + continue } tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType) - if tsMsg == nil && err != nil { - panic("null unMarshalFunc for " + headerMsg.Base.MsgType.String() + " msg type") - } if err != nil { - log.Printf("Failed to unmarshal, error = %v", err) + log.Printf("Failed to unmarshal tsMsg, error = %v", err) + continue } // set pulsar info to tsMsg tsMsg.SetPosition(&msgstream.MsgPosition{ @@ -534,9 +578,9 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer, MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()), }) - mu.Lock() + ms.unsolvedMutex.Lock() ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg) - mu.Unlock() + ms.unsolvedMutex.Unlock() if headerMsg.Base.MsgType == commonpb.MsgType_kTimeTick { findMapMutex.Lock() @@ -549,50 +593,60 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer, } func (ms *PulsarTtMsgStream) Seek(mp *internalpb2.MsgPosition) error { + var consumer Consumer + var messageID MessageID for index, channel := range ms.consumerChannels { if filepath.Base(channel) == filepath.Base(mp.ChannelName) { - messageID, err := typeutil.StringToPulsarMsgID(mp.MsgID) - if err != nil { - return err - } - consumer := ms.consumers[index] - err = (consumer).Seek(messageID) + seekMsgID, err := typeutil.StringToPulsarMsgID(mp.MsgID) if err != nil { return err } + consumer = ms.consumers[index] + messageID = seekMsgID + break + } + } - for { - select { - case <-ms.ctx.Done(): - return nil - case pulsarMsg, ok := <-consumer.Chan(): - if !ok { - return errors.New("consumer closed") - } - consumer.Ack(pulsarMsg) + if consumer != nil { + err := (consumer).Seek(messageID) + if err != nil { + return err + } - headerMsg := commonpb.MsgHeader{} - err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) - if err != nil { - log.Printf("Failed to unmarshal msgHeader, error = %v", err) - } - tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType) - if tsMsg == nil && err != nil { - panic("null unMarshalFunc for " + headerMsg.Base.MsgType.String() + " msg type") + ms.unsolvedMutex.Lock() + ms.unsolvedBuf[consumer] = make([]TsMsg, 0) + for { + select { + case <-ms.ctx.Done(): + return nil + case pulsarMsg, ok := <-consumer.Chan(): + if !ok { + return errors.New("consumer closed") + } + consumer.Ack(pulsarMsg) + headerMsg := commonpb.MsgHeader{} + err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) + if err != nil { + log.Printf("Failed to unmarshal message header, error = %v", err) + } + tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType) + if err != nil { + log.Printf("Failed to unmarshal tsMsg, error = %v", err) + } + if tsMsg.Type() == commonpb.MsgType_kTimeTick { + if tsMsg.BeginTs() >= mp.Timestamp { + ms.unsolvedMutex.Unlock() + return nil } - if err != nil { - log.Printf("Failed to unmarshal pulsarMsg, error = %v", err) - } - if tsMsg.Type() == commonpb.MsgType_kTimeTick { - if tsMsg.BeginTs() >= mp.Timestamp { - return nil - } - continue - } - if tsMsg.BeginTs() > mp.Timestamp { - ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg) - } + continue + } + if tsMsg.BeginTs() > mp.Timestamp { + tsMsg.SetPosition(&msgstream.MsgPosition{ + ChannelName: filepath.Base(pulsarMsg.Topic()), + MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()), + }) + ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg) } } } diff --git a/internal/msgstream/pulsarms/pulsar_msgstream_test.go b/internal/msgstream/pulsarms/pulsar_msgstream_test.go index 2fcfad927eb5c7680ea88df5763da9bbb67045f7..b406cbb7429332845d8e6220ffe9a6c27d27b0bb 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream_test.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream_test.go @@ -568,7 +568,7 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_kInsert, 9, 9)) msgPack4 := MsgPack{} - msgPack4.Msgs = append(msgPack2.Msgs, getTimeTickMsg(11, 11, 11)) + msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11, 11, 11)) msgPack5 := MsgPack{} msgPack5.Msgs = append(msgPack5.Msgs, getTimeTickMsg(15, 15, 15))