mq_msgstream.go 20.2 KB
Newer Older
X
xige-16 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
Xiangyu Wang 已提交
12
package msgstream
Y
yukun 已提交
13 14 15 16

import (
	"context"
	"errors"
C
Cai Yudong 已提交
17
	"fmt"
Y
yukun 已提交
18 19 20 21 22
	"path/filepath"
	"sync"
	"time"

	"github.com/golang/protobuf/proto"
X
Xiangyu Wang 已提交
23 24 25 26 27
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/util/mqclient"
	"github.com/milvus-io/milvus/internal/util/trace"
X
Xiangyu Wang 已提交
28
	"github.com/opentracing/opentracing-go"
Y
yukun 已提交
29 30 31
	"go.uber.org/zap"
)

X
Xiangyu Wang 已提交
32
type mqMsgStream struct {
Y
yukun 已提交
33
	ctx              context.Context
X
Xiangyu Wang 已提交
34 35
	client           mqclient.Client
	producers        map[string]mqclient.Producer
Y
yukun 已提交
36
	producerChannels []string
X
Xiangyu Wang 已提交
37
	consumers        map[string]mqclient.Consumer
Y
yukun 已提交
38 39 40 41 42 43 44 45 46 47 48
	consumerChannels []string
	repackFunc       RepackFunc
	unmarshal        UnmarshalDispatcher
	receiveBuf       chan *MsgPack
	wait             *sync.WaitGroup
	streamCancel     func()
	bufSize          int64
	producerLock     *sync.Mutex
	consumerLock     *sync.Mutex
}

X
Xiangyu Wang 已提交
49
func NewMqMsgStream(ctx context.Context,
Y
yukun 已提交
50 51
	receiveBufSize int64,
	bufSize int64,
X
Xiangyu Wang 已提交
52 53
	client mqclient.Client,
	unmarshal UnmarshalDispatcher) (*mqMsgStream, error) {
Y
yukun 已提交
54 55

	streamCtx, streamCancel := context.WithCancel(ctx)
X
Xiangyu Wang 已提交
56 57
	producers := make(map[string]mqclient.Producer)
	consumers := make(map[string]mqclient.Consumer)
Y
yukun 已提交
58 59 60 61
	producerChannels := make([]string, 0)
	consumerChannels := make([]string, 0)
	receiveBuf := make(chan *MsgPack, receiveBufSize)

X
Xiangyu Wang 已提交
62
	stream := &mqMsgStream{
Y
yukun 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
		ctx:              streamCtx,
		client:           client,
		producers:        producers,
		producerChannels: producerChannels,
		consumers:        consumers,
		consumerChannels: consumerChannels,
		unmarshal:        unmarshal,
		bufSize:          bufSize,
		receiveBuf:       receiveBuf,
		streamCancel:     streamCancel,
		producerLock:     &sync.Mutex{},
		consumerLock:     &sync.Mutex{},
		wait:             &sync.WaitGroup{},
	}

	return stream, nil
}

X
Xiangyu Wang 已提交
81
func (ms *mqMsgStream) AsProducer(channels []string) {
Y
yukun 已提交
82 83
	for _, channel := range channels {
		fn := func() error {
X
Xiangyu Wang 已提交
84
			pp, err := ms.client.CreateProducer(mqclient.ProducerOptions{Topic: channel})
Y
yukun 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97
			if err != nil {
				return err
			}
			if pp == nil {
				return errors.New("Producer is nil")
			}

			ms.producerLock.Lock()
			ms.producers[channel] = pp
			ms.producerChannels = append(ms.producerChannels, channel)
			ms.producerLock.Unlock()
			return nil
		}
X
Xiangyu Wang 已提交
98
		err := Retry(20, time.Millisecond*200, fn)
Y
yukun 已提交
99 100 101 102 103 104 105
		if err != nil {
			errMsg := "Failed to create producer " + channel + ", error = " + err.Error()
			panic(errMsg)
		}
	}
}

106
func (ms *mqMsgStream) AsConsumer(channels []string, subName string) {
Y
yukun 已提交
107 108 109 110 111
	for _, channel := range channels {
		if _, ok := ms.consumers[channel]; ok {
			continue
		}
		fn := func() error {
X
Xiangyu Wang 已提交
112 113
			receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
			pc, err := ms.client.Subscribe(mqclient.ConsumerOptions{
Y
yukun 已提交
114 115
				Topic:                       channel,
				SubscriptionName:            subName,
X
Xiangyu Wang 已提交
116 117
				Type:                        mqclient.KeyShared,
				SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
Y
yukun 已提交
118 119 120 121 122 123 124 125 126
				MessageChannel:              receiveChannel,
			})
			if err != nil {
				return err
			}
			if pc == nil {
				return errors.New("Consumer is nil")
			}

127
			ms.consumerLock.Lock()
Y
yukun 已提交
128 129
			ms.consumers[channel] = pc
			ms.consumerChannels = append(ms.consumerChannels, channel)
130
			ms.consumerLock.Unlock()
Y
yukun 已提交
131 132 133 134
			ms.wait.Add(1)
			go ms.receiveMsg(pc)
			return nil
		}
X
Xiangyu Wang 已提交
135
		err := Retry(20, time.Millisecond*200, fn)
Y
yukun 已提交
136 137 138 139 140 141 142
		if err != nil {
			errMsg := "Failed to create consumer " + channel + ", error = " + err.Error()
			panic(errMsg)
		}
	}
}

X
Xiangyu Wang 已提交
143
func (ms *mqMsgStream) SetRepackFunc(repackFunc RepackFunc) {
Y
yukun 已提交
144 145 146
	ms.repackFunc = repackFunc
}

X
Xiangyu Wang 已提交
147
func (ms *mqMsgStream) Start() {
Y
yukun 已提交
148 149
}

X
Xiangyu Wang 已提交
150
func (ms *mqMsgStream) Close() {
Y
yukun 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
	ms.streamCancel()
	ms.wait.Wait()

	for _, producer := range ms.producers {
		if producer != nil {
			producer.Close()
		}
	}
	for _, consumer := range ms.consumers {
		if consumer != nil {
			consumer.Close()
		}
	}
	if ms.client != nil {
		ms.client.Close()
	}
}

169
func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 {
Y
yukun 已提交
170 171 172 173
	if len(tsMsgs) <= 0 {
		return nil
	}
	reBucketValues := make([][]int32, len(tsMsgs))
174
	channelNum := uint32(len(ms.producerChannels))
175

176 177 178
	if channelNum == 0 {
		return nil
	}
Y
yukun 已提交
179 180 181 182
	for idx, tsMsg := range tsMsgs {
		hashValues := tsMsg.HashKeys()
		bucketValues := make([]int32, len(hashValues))
		for index, hashValue := range hashValues {
183
			bucketValues[index] = int32(hashValue % channelNum)
Y
yukun 已提交
184 185 186
		}
		reBucketValues[idx] = bucketValues
	}
187 188
	return reBucketValues
}
Y
yukun 已提交
189

190 191 192 193
func (ms *mqMsgStream) GetProduceChannels() []string {
	return ms.producerChannels
}

194
func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
195
	if msgPack == nil || len(msgPack.Msgs) <= 0 {
196 197 198 199 200 201
		log.Debug("Warning: Receive empty msgPack")
		return nil
	}
	if len(ms.producers) <= 0 {
		return errors.New("nil producer in msg stream")
	}
202
	tsMsgs := msgPack.Msgs
203
	reBucketValues := ms.ComputeProduceChannelIndexes(msgPack.Msgs)
Y
yukun 已提交
204 205 206 207 208 209 210 211
	var result map[int32]*MsgPack
	var err error
	if ms.repackFunc != nil {
		result, err = ms.repackFunc(tsMsgs, reBucketValues)
	} else {
		msgType := (tsMsgs[0]).Type()
		switch msgType {
		case commonpb.MsgType_Insert:
X
Xiangyu Wang 已提交
212
			result, err = InsertRepackFunc(tsMsgs, reBucketValues)
Y
yukun 已提交
213
		case commonpb.MsgType_Delete:
X
Xiangyu Wang 已提交
214
			result, err = DeleteRepackFunc(tsMsgs, reBucketValues)
Y
yukun 已提交
215
		default:
X
Xiangyu Wang 已提交
216
			result, err = DefaultRepackFunc(tsMsgs, reBucketValues)
Y
yukun 已提交
217 218 219 220 221 222 223 224
		}
	}
	if err != nil {
		return err
	}
	for k, v := range result {
		channel := ms.producerChannels[k]
		for i := 0; i < len(v.Msgs); i++ {
X
Xiangyu Wang 已提交
225
			sp, spanCtx := MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i])
X
Xiangyu Wang 已提交
226

Y
yukun 已提交
227 228 229 230 231
			mb, err := v.Msgs[i].Marshal(v.Msgs[i])
			if err != nil {
				return err
			}

X
Xiangyu Wang 已提交
232
			m, err := ConvertToByteArray(mb)
Y
yukun 已提交
233 234 235 236
			if err != nil {
				return err
			}

X
Xiangyu Wang 已提交
237
			msg := &mqclient.ProducerMessage{Payload: m, Properties: map[string]string{}}
Y
yukun 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254

			trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)

			if err := ms.producers[channel].Send(
				spanCtx,
				msg,
			); err != nil {
				trace.LogError(sp, err)
				sp.Finish()
				return err
			}
			sp.Finish()
		}
	}
	return nil
}

X
Xiangyu Wang 已提交
255
func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error {
256 257 258 259
	if msgPack == nil || len(msgPack.Msgs) <= 0 {
		log.Debug("Warning: Receive empty msgPack")
		return nil
	}
Y
yukun 已提交
260
	for _, v := range msgPack.Msgs {
X
Xiangyu Wang 已提交
261
		sp, spanCtx := MsgSpanFromCtx(v.TraceCtx(), v)
X
Xiangyu Wang 已提交
262

Y
yukun 已提交
263 264 265 266 267
		mb, err := v.Marshal(v)
		if err != nil {
			return err
		}

X
Xiangyu Wang 已提交
268
		m, err := ConvertToByteArray(mb)
Y
yukun 已提交
269 270 271 272
		if err != nil {
			return err
		}

X
Xiangyu Wang 已提交
273
		msg := &mqclient.ProducerMessage{Payload: m, Properties: map[string]string{}}
Y
yukun 已提交
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293

		trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)

		ms.producerLock.Lock()
		for _, producer := range ms.producers {
			if err := producer.Send(
				spanCtx,
				msg,
			); err != nil {
				trace.LogError(sp, err)
				sp.Finish()
				return err
			}
		}
		ms.producerLock.Unlock()
		sp.Finish()
	}
	return nil
}

X
Xiangyu Wang 已提交
294
func (ms *mqMsgStream) Consume() *MsgPack {
Y
yukun 已提交
295 296
	for {
		select {
297 298 299
		case <-ms.ctx.Done():
			//log.Debug("context closed")
			return nil
Y
yukun 已提交
300 301 302 303 304 305 306 307 308 309
		case cm, ok := <-ms.receiveBuf:
			if !ok {
				log.Debug("buf chan closed")
				return nil
			}
			return cm
		}
	}
}

310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
func (ms *mqMsgStream) getTsMsgFromConsumerMsg(msg mqclient.ConsumerMessage) (TsMsg, error) {
	header := commonpb.MsgHeader{}
	err := proto.Unmarshal(msg.Payload(), &header)
	if err != nil {
		return nil, fmt.Errorf("Failed to unmarshal message header, err %s", err.Error())
	}
	tsMsg, err := ms.unmarshal.Unmarshal(msg.Payload(), header.Base.MsgType)
	if err != nil {
		return nil, fmt.Errorf("Failed to unmarshal tsMsg, err %s", err.Error())
	}

	// set msg info to tsMsg
	tsMsg.SetPosition(&MsgPosition{
		ChannelName: filepath.Base(msg.Topic()),
		MsgID:       msg.ID().Serialize(),
	})

	return tsMsg, nil
}

X
Xiangyu Wang 已提交
330
func (ms *mqMsgStream) receiveMsg(consumer mqclient.Consumer) {
Y
yukun 已提交
331 332 333 334 335 336 337 338 339 340 341
	defer ms.wait.Done()

	for {
		select {
		case <-ms.ctx.Done():
			return
		case msg, ok := <-consumer.Chan():
			if !ok {
				return
			}
			consumer.Ack(msg)
342 343

			tsMsg, err := ms.getTsMsgFromConsumerMsg(msg)
Y
yukun 已提交
344
			if err != nil {
345
				log.Error("Failed to getTsMsgFromConsumerMsg", zap.Error(err))
Y
yukun 已提交
346 347 348
				continue
			}

X
Xiangyu Wang 已提交
349
			sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
X
Xiangyu Wang 已提交
350 351 352 353
			if ok {
				tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
			}

Y
yukun 已提交
354 355 356 357 358 359 360 361
			msgPack := MsgPack{Msgs: []TsMsg{tsMsg}}
			ms.receiveBuf <- &msgPack

			sp.Finish()
		}
	}
}

X
Xiangyu Wang 已提交
362
func (ms *mqMsgStream) Chan() <-chan *MsgPack {
Y
yukun 已提交
363 364 365
	return ms.receiveBuf
}

C
Cai Yudong 已提交
366 367 368 369 370 371
func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
	for _, mp := range msgPositions {
		consumer, ok := ms.consumers[mp.ChannelName]
		if !ok {
			return fmt.Errorf("channel %s not subscribed", mp.ChannelName)
		}
X
xige-16 已提交
372
		messageID, err := ms.client.BytesToMsgID(mp.MsgID)
Y
yukun 已提交
373 374 375 376 377 378 379 380
		if err != nil {
			return err
		}
		err = consumer.Seek(messageID)
		if err != nil {
			return err
		}
	}
C
Cai Yudong 已提交
381
	return nil
Y
yukun 已提交
382 383
}

X
Xiangyu Wang 已提交
384 385
type MqTtMsgStream struct {
	mqMsgStream
386 387 388 389 390 391 392 393 394
	chanMsgBuf         map[mqclient.Consumer][]TsMsg
	chanMsgPos         map[mqclient.Consumer]*internalpb.MsgPosition
	chanStopChan       map[mqclient.Consumer]chan bool
	chanTtMsgTime      map[mqclient.Consumer]Timestamp
	chanMsgBufMutex    *sync.Mutex
	chanTtMsgTimeMutex *sync.RWMutex
	chanWaitGroup      *sync.WaitGroup
	lastTimeStamp      Timestamp
	syncConsumer       chan int
Y
yukun 已提交
395 396
}

X
Xiangyu Wang 已提交
397
func NewMqTtMsgStream(ctx context.Context,
Y
yukun 已提交
398 399
	receiveBufSize int64,
	bufSize int64,
X
Xiangyu Wang 已提交
400 401 402
	client mqclient.Client,
	unmarshal UnmarshalDispatcher) (*MqTtMsgStream, error) {
	msgStream, err := NewMqMsgStream(ctx, receiveBufSize, bufSize, client, unmarshal)
Y
yukun 已提交
403 404 405
	if err != nil {
		return nil, err
	}
406 407 408 409
	chanMsgBuf := make(map[mqclient.Consumer][]TsMsg)
	chanMsgPos := make(map[mqclient.Consumer]*internalpb.MsgPosition)
	chanStopChan := make(map[mqclient.Consumer]chan bool)
	chanTtMsgTime := make(map[mqclient.Consumer]Timestamp)
Y
yukun 已提交
410 411
	syncConsumer := make(chan int, 1)

X
Xiangyu Wang 已提交
412
	return &MqTtMsgStream{
413 414 415 416 417 418 419 420 421
		mqMsgStream:        *msgStream,
		chanMsgBuf:         chanMsgBuf,
		chanMsgPos:         chanMsgPos,
		chanStopChan:       chanStopChan,
		chanTtMsgTime:      chanTtMsgTime,
		chanMsgBufMutex:    &sync.Mutex{},
		chanTtMsgTimeMutex: &sync.RWMutex{},
		chanWaitGroup:      &sync.WaitGroup{},
		syncConsumer:       syncConsumer,
Y
yukun 已提交
422 423 424
	}, nil
}

X
Xiangyu Wang 已提交
425
func (ms *MqTtMsgStream) addConsumer(consumer mqclient.Consumer, channel string) {
Y
yukun 已提交
426 427 428 429 430
	if len(ms.consumers) == 0 {
		ms.syncConsumer <- 1
	}
	ms.consumers[channel] = consumer
	ms.consumerChannels = append(ms.consumerChannels, channel)
431 432
	ms.chanMsgBuf[consumer] = make([]TsMsg, 0)
	ms.chanMsgPos[consumer] = &internalpb.MsgPosition{
Y
yukun 已提交
433
		ChannelName: channel,
X
xige-16 已提交
434
		MsgID:       make([]byte, 0),
Y
yukun 已提交
435 436
		Timestamp:   ms.lastTimeStamp,
	}
437 438
	ms.chanStopChan[consumer] = make(chan bool)
	ms.chanTtMsgTime[consumer] = 0
Y
yukun 已提交
439 440
}

441 442
// AsConsumer subscribes channels as consumer for a MsgStream
func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string) {
Y
yukun 已提交
443 444 445 446 447
	for _, channel := range channels {
		if _, ok := ms.consumers[channel]; ok {
			continue
		}
		fn := func() error {
X
Xiangyu Wang 已提交
448 449
			receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
			pc, err := ms.client.Subscribe(mqclient.ConsumerOptions{
Y
yukun 已提交
450 451
				Topic:                       channel,
				SubscriptionName:            subName,
X
Xiangyu Wang 已提交
452 453
				Type:                        mqclient.KeyShared,
				SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
Y
yukun 已提交
454 455 456 457 458 459 460 461 462 463 464 465 466 467
				MessageChannel:              receiveChannel,
			})
			if err != nil {
				return err
			}
			if pc == nil {
				return errors.New("Consumer is nil")
			}

			ms.consumerLock.Lock()
			ms.addConsumer(pc, channel)
			ms.consumerLock.Unlock()
			return nil
		}
X
Xiangyu Wang 已提交
468
		err := Retry(10, time.Millisecond*200, fn)
Y
yukun 已提交
469 470 471 472 473 474 475
		if err != nil {
			errMsg := "Failed to create consumer " + channel + ", error = " + err.Error()
			panic(errMsg)
		}
	}
}

X
Xiangyu Wang 已提交
476
func (ms *MqTtMsgStream) Start() {
Y
yukun 已提交
477 478 479 480 481 482
	if ms.consumers != nil {
		ms.wait.Add(1)
		go ms.bufMsgPackToChannel()
	}
}

X
Xiangyu Wang 已提交
483
func (ms *MqTtMsgStream) Close() {
Y
yukun 已提交
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502
	ms.streamCancel()
	close(ms.syncConsumer)
	ms.wait.Wait()

	for _, producer := range ms.producers {
		if producer != nil {
			producer.Close()
		}
	}
	for _, consumer := range ms.consumers {
		if consumer != nil {
			consumer.Close()
		}
	}
	if ms.client != nil {
		ms.client.Close()
	}
}

X
Xiangyu Wang 已提交
503
func (ms *MqTtMsgStream) bufMsgPackToChannel() {
Y
yukun 已提交
504
	defer ms.wait.Done()
505
	chanTtMsgSync := make(map[mqclient.Consumer]bool)
Y
yukun 已提交
506

507
	// block here until addConsumer
Y
yukun 已提交
508 509 510 511 512 513 514 515 516 517 518
	if _, ok := <-ms.syncConsumer; !ok {
		log.Debug("consumer closed!")
		return
	}

	for {
		select {
		case <-ms.ctx.Done():
			return
		default:
			ms.consumerLock.Lock()
519 520

			// wait all channels get ttMsg
Y
yukun 已提交
521
			for _, consumer := range ms.consumers {
522 523 524
				if !chanTtMsgSync[consumer] {
					ms.chanWaitGroup.Add(1)
					go ms.consumeToTtMsg(consumer)
Y
yukun 已提交
525 526
				}
			}
527 528 529 530 531
			ms.chanWaitGroup.Wait()

			// block here until all channels reach same timetick
			currTs, ok := ms.allChanReachSameTtMsg(chanTtMsgSync)
			if !ok || currTs <= ms.lastTimeStamp {
Y
yukun 已提交
532
				//log.Printf("All timeTick's timestamps are inconsistent")
533
				ms.consumerLock.Unlock()
Y
yukun 已提交
534 535
				continue
			}
536

Y
yukun 已提交
537 538 539
			timeTickBuf := make([]TsMsg, 0)
			startMsgPosition := make([]*internalpb.MsgPosition, 0)
			endMsgPositions := make([]*internalpb.MsgPosition, 0)
540 541
			ms.chanMsgBufMutex.Lock()
			for consumer, msgs := range ms.chanMsgBuf {
Y
yukun 已提交
542 543 544 545 546 547 548 549 550 551
				if len(msgs) == 0 {
					continue
				}
				tempBuffer := make([]TsMsg, 0)
				var timeTickMsg TsMsg
				for _, v := range msgs {
					if v.Type() == commonpb.MsgType_TimeTick {
						timeTickMsg = v
						continue
					}
552
					if v.EndTs() <= currTs {
Y
yukun 已提交
553
						timeTickBuf = append(timeTickBuf, v)
554
						//log.Debug("pack msg", zap.Uint64("curr", v.EndTs()), zap.Uint64("currTs", currTs))
Y
yukun 已提交
555 556 557 558
					} else {
						tempBuffer = append(tempBuffer, v)
					}
				}
559
				ms.chanMsgBuf[consumer] = tempBuffer
Y
yukun 已提交
560

561
				startMsgPosition = append(startMsgPosition, ms.chanMsgPos[consumer])
Y
yukun 已提交
562 563
				var newPos *internalpb.MsgPosition
				if len(tempBuffer) > 0 {
564
					// if tempBuffer is not empty, use tempBuffer[0] to seek
Y
yukun 已提交
565 566 567
					newPos = &internalpb.MsgPosition{
						ChannelName: tempBuffer[0].Position().ChannelName,
						MsgID:       tempBuffer[0].Position().MsgID,
568
						Timestamp:   currTs,
569
						MsgGroup:    consumer.Subscription(),
Y
yukun 已提交
570 571
					}
					endMsgPositions = append(endMsgPositions, newPos)
572 573
				} else if timeTickMsg != nil {
					// if tempBuffer is empty, use timeTickMsg to seek
Y
yukun 已提交
574 575 576
					newPos = &internalpb.MsgPosition{
						ChannelName: timeTickMsg.Position().ChannelName,
						MsgID:       timeTickMsg.Position().MsgID,
577
						Timestamp:   currTs,
578
						MsgGroup:    consumer.Subscription(),
Y
yukun 已提交
579 580 581
					}
					endMsgPositions = append(endMsgPositions, newPos)
				}
582
				ms.chanMsgPos[consumer] = newPos
Y
yukun 已提交
583
			}
584
			ms.chanMsgBufMutex.Unlock()
585
			ms.consumerLock.Unlock()
Y
yukun 已提交
586 587 588

			msgPack := MsgPack{
				BeginTs:        ms.lastTimeStamp,
589
				EndTs:          currTs,
Y
yukun 已提交
590 591 592 593 594
				Msgs:           timeTickBuf,
				StartPositions: startMsgPosition,
				EndPositions:   endMsgPositions,
			}

595
			//log.Debug("send msg pack", zap.Int("len", len(msgPack.Msgs)), zap.Uint64("currTs", currTs))
Y
yukun 已提交
596
			ms.receiveBuf <- &msgPack
597
			ms.lastTimeStamp = currTs
Y
yukun 已提交
598 599 600 601
		}
	}
}

602 603 604
// Save all msgs into chanMsgBuf[] till receive one ttMsg
func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqclient.Consumer) {
	defer ms.chanWaitGroup.Done()
Y
yukun 已提交
605 606 607 608
	for {
		select {
		case <-ms.ctx.Done():
			return
609 610
		case <-ms.chanStopChan[consumer]:
			return
Y
yukun 已提交
611 612 613 614 615 616 617
		case msg, ok := <-consumer.Chan():
			if !ok {
				log.Debug("consumer closed!")
				return
			}
			consumer.Ack(msg)

618
			tsMsg, err := ms.getTsMsgFromConsumerMsg(msg)
Y
yukun 已提交
619
			if err != nil {
620
				log.Error("Failed to getTsMsgFromConsumerMsg", zap.Error(err))
Y
yukun 已提交
621 622 623
				continue
			}

X
Xiangyu Wang 已提交
624
			sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
Y
yukun 已提交
625
			if ok {
X
Xiangyu Wang 已提交
626
				tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
Y
yukun 已提交
627 628
			}

629 630 631
			ms.chanMsgBufMutex.Lock()
			ms.chanMsgBuf[consumer] = append(ms.chanMsgBuf[consumer], tsMsg)
			ms.chanMsgBufMutex.Unlock()
Y
yukun 已提交
632

633 634 635 636
			if tsMsg.Type() == commonpb.MsgType_TimeTick {
				ms.chanTtMsgTimeMutex.Lock()
				ms.chanTtMsgTime[consumer] = tsMsg.(*TimeTickMsg).Base.Timestamp
				ms.chanTtMsgTimeMutex.Unlock()
Y
yukun 已提交
637 638 639 640 641 642 643 644
				sp.Finish()
				return
			}
			sp.Finish()
		}
	}
}

645 646 647
// return true only when all channels reach same timetick
func (ms *MqTtMsgStream) allChanReachSameTtMsg(chanTtMsgSync map[mqclient.Consumer]bool) (Timestamp, bool) {
	tsMap := make(map[Timestamp]int)
Y
yukun 已提交
648
	var maxTime Timestamp = 0
649 650 651 652
	for _, t := range ms.chanTtMsgTime {
		tsMap[t]++
		if t > maxTime {
			maxTime = t
Y
yukun 已提交
653 654
		}
	}
655 656 657 658
	// when all channels reach same timetick, timeMap should contain only 1 timestamp
	if len(tsMap) <= 1 {
		for consumer := range ms.chanTtMsgTime {
			chanTtMsgSync[consumer] = false
Y
yukun 已提交
659 660 661
		}
		return maxTime, true
	}
662 663 664 665
	for consumer := range ms.chanTtMsgTime {
		ms.chanTtMsgTimeMutex.RLock()
		chanTtMsgSync[consumer] = (ms.chanTtMsgTime[consumer] == maxTime)
		ms.chanTtMsgTimeMutex.RUnlock()
Y
yukun 已提交
666 667 668 669 670
	}

	return 0, false
}

C
Cai Yudong 已提交
671 672
// Seek to the specified position
func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
X
Xiangyu Wang 已提交
673
	var consumer mqclient.Consumer
C
Cai Yudong 已提交
674
	var mp *MsgPosition
Y
yukun 已提交
675
	var err error
X
xige-16 已提交
676
	fn := func() error {
C
Cai Yudong 已提交
677 678 679 680
		if _, ok := ms.consumers[mp.ChannelName]; ok {
			return fmt.Errorf("the channel should not been subscribed")
		}

X
Xiangyu Wang 已提交
681 682
		receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
		consumer, err = ms.client.Subscribe(mqclient.ConsumerOptions{
C
Cai Yudong 已提交
683 684
			Topic:                       mp.ChannelName,
			SubscriptionName:            mp.MsgGroup,
X
Xiangyu Wang 已提交
685 686
			SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
			Type:                        mqclient.KeyShared,
X
xige-16 已提交
687 688 689 690 691 692
			MessageChannel:              receiveChannel,
		})
		if err != nil {
			return err
		}
		if consumer == nil {
C
Cai Yudong 已提交
693
			return fmt.Errorf("consumer is nil")
X
xige-16 已提交
694
		}
Y
yukun 已提交
695

X
xige-16 已提交
696 697 698 699 700 701 702 703 704 705 706
		seekMsgID, err := ms.client.BytesToMsgID(mp.MsgID)
		if err != nil {
			return err
		}
		err = consumer.Seek(seekMsgID)
		if err != nil {
			return err
		}

		return nil
	}
Y
yukun 已提交
707

C
Cai Yudong 已提交
708 709
	ms.consumerLock.Lock()
	defer ms.consumerLock.Unlock()
Y
yukun 已提交
710

C
Cai Yudong 已提交
711 712 713 714 715
	for idx := range msgPositions {
		mp = msgPositions[idx]
		if len(mp.MsgID) == 0 {
			return fmt.Errorf("when msgID's length equal to 0, please use AsConsumer interface")
		}
Y
yukun 已提交
716

C
Cai Yudong 已提交
717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754
		if err = Retry(20, time.Millisecond*200, fn); err != nil {
			return fmt.Errorf("Failed to seek, error %s", err.Error())
		}
		ms.addConsumer(consumer, mp.ChannelName)

		//TODO: May cause problem
		//if len(consumer.Chan()) == 0 {
		//	return nil
		//}

		runLoop := true
		for runLoop {
			select {
			case <-ms.ctx.Done():
				return nil
			case msg, ok := <-consumer.Chan():
				if !ok {
					return fmt.Errorf("consumer closed")
				}
				consumer.Ack(msg)

				headerMsg := commonpb.MsgHeader{}
				err := proto.Unmarshal(msg.Payload(), &headerMsg)
				if err != nil {
					return fmt.Errorf("Failed to unmarshal message header, err %s", err.Error())
				}
				tsMsg, err := ms.unmarshal.Unmarshal(msg.Payload(), headerMsg.Base.MsgType)
				if err != nil {
					return fmt.Errorf("Failed to unmarshal tsMsg, err %s", err.Error())
				}
				if tsMsg.Type() == commonpb.MsgType_TimeTick && tsMsg.BeginTs() >= mp.Timestamp {
					runLoop = false
					break
				} else if tsMsg.BeginTs() > mp.Timestamp {
					tsMsg.SetPosition(&MsgPosition{
						ChannelName: filepath.Base(msg.Topic()),
						MsgID:       msg.ID().Serialize(),
					})
755
					ms.chanMsgBuf[consumer] = append(ms.chanMsgBuf[consumer], tsMsg)
Y
yukun 已提交
756 757 758 759
				}
			}
		}
	}
C
Cai Yudong 已提交
760
	return nil
Y
yukun 已提交
761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800
}

//TODO test InMemMsgStream
/*
type InMemMsgStream struct {
	buffer chan *MsgPack
}

func (ms *InMemMsgStream) Start() {}
func (ms *InMemMsgStream) Close() {}

func (ms *InMemMsgStream) ProduceOne(msg TsMsg) error {
	msgPack := MsgPack{}
	msgPack.BeginTs = msg.BeginTs()
	msgPack.EndTs = msg.EndTs()
	msgPack.Msgs = append(msgPack.Msgs, msg)
	buffer <- &msgPack
	return nil
}

func (ms *InMemMsgStream) Produce(msgPack *MsgPack) error {
	buffer <- msgPack
	return nil
}

func (ms *InMemMsgStream) Broadcast(msgPack *MsgPack) error {
	return ms.Produce(msgPack)
}

func (ms *InMemMsgStream) Consume() *MsgPack {
	select {
	case msgPack := <-ms.buffer:
		return msgPack
	}
}

func (ms *InMemMsgStream) Chan() <- chan *MsgPack {
	return buffer
}
*/