mq_msgstream.go 20.6 KB
Newer Older
X
xige-16 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
Xiangyu Wang 已提交
12
package msgstream
Y
yukun 已提交
13 14

import (
G
godchen 已提交
15
	"bytes"
Y
yukun 已提交
16 17
	"context"
	"errors"
C
Cai Yudong 已提交
18
	"fmt"
Y
yukun 已提交
19 20 21 22 23
	"path/filepath"
	"sync"
	"time"

	"github.com/golang/protobuf/proto"
X
Xiangyu Wang 已提交
24 25 26 27 28
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/util/mqclient"
	"github.com/milvus-io/milvus/internal/util/trace"
X
Xiangyu Wang 已提交
29
	"github.com/opentracing/opentracing-go"
Y
yukun 已提交
30 31 32
	"go.uber.org/zap"
)

X
Xiangyu Wang 已提交
33
type mqMsgStream struct {
Y
yukun 已提交
34
	ctx              context.Context
X
Xiangyu Wang 已提交
35 36
	client           mqclient.Client
	producers        map[string]mqclient.Producer
Y
yukun 已提交
37
	producerChannels []string
X
Xiangyu Wang 已提交
38
	consumers        map[string]mqclient.Consumer
Y
yukun 已提交
39 40 41 42 43 44 45 46 47 48 49
	consumerChannels []string
	repackFunc       RepackFunc
	unmarshal        UnmarshalDispatcher
	receiveBuf       chan *MsgPack
	wait             *sync.WaitGroup
	streamCancel     func()
	bufSize          int64
	producerLock     *sync.Mutex
	consumerLock     *sync.Mutex
}

X
Xiangyu Wang 已提交
50
func NewMqMsgStream(ctx context.Context,
Y
yukun 已提交
51 52
	receiveBufSize int64,
	bufSize int64,
X
Xiangyu Wang 已提交
53 54
	client mqclient.Client,
	unmarshal UnmarshalDispatcher) (*mqMsgStream, error) {
Y
yukun 已提交
55 56

	streamCtx, streamCancel := context.WithCancel(ctx)
X
Xiangyu Wang 已提交
57 58
	producers := make(map[string]mqclient.Producer)
	consumers := make(map[string]mqclient.Consumer)
Y
yukun 已提交
59 60 61 62
	producerChannels := make([]string, 0)
	consumerChannels := make([]string, 0)
	receiveBuf := make(chan *MsgPack, receiveBufSize)

X
Xiangyu Wang 已提交
63
	stream := &mqMsgStream{
Y
yukun 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
		ctx:              streamCtx,
		client:           client,
		producers:        producers,
		producerChannels: producerChannels,
		consumers:        consumers,
		consumerChannels: consumerChannels,
		unmarshal:        unmarshal,
		bufSize:          bufSize,
		receiveBuf:       receiveBuf,
		streamCancel:     streamCancel,
		producerLock:     &sync.Mutex{},
		consumerLock:     &sync.Mutex{},
		wait:             &sync.WaitGroup{},
	}

	return stream, nil
}

X
Xiangyu Wang 已提交
82
func (ms *mqMsgStream) AsProducer(channels []string) {
Y
yukun 已提交
83
	for _, channel := range channels {
84 85 86 87
		if len(channel) == 0 {
			log.Error("MsgStream asProducer's channel is a empty string")
			break
		}
Y
yukun 已提交
88
		fn := func() error {
X
Xiangyu Wang 已提交
89
			pp, err := ms.client.CreateProducer(mqclient.ProducerOptions{Topic: channel})
Y
yukun 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102
			if err != nil {
				return err
			}
			if pp == nil {
				return errors.New("Producer is nil")
			}

			ms.producerLock.Lock()
			ms.producers[channel] = pp
			ms.producerChannels = append(ms.producerChannels, channel)
			ms.producerLock.Unlock()
			return nil
		}
X
Xiangyu Wang 已提交
103
		err := Retry(20, time.Millisecond*200, fn)
Y
yukun 已提交
104 105 106 107 108 109 110
		if err != nil {
			errMsg := "Failed to create producer " + channel + ", error = " + err.Error()
			panic(errMsg)
		}
	}
}

111
func (ms *mqMsgStream) AsConsumer(channels []string, subName string) {
Y
yukun 已提交
112 113 114 115 116
	for _, channel := range channels {
		if _, ok := ms.consumers[channel]; ok {
			continue
		}
		fn := func() error {
X
Xiangyu Wang 已提交
117 118
			receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
			pc, err := ms.client.Subscribe(mqclient.ConsumerOptions{
Y
yukun 已提交
119 120
				Topic:                       channel,
				SubscriptionName:            subName,
X
Xiangyu Wang 已提交
121 122
				Type:                        mqclient.KeyShared,
				SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
Y
yukun 已提交
123 124 125 126 127 128 129 130 131
				MessageChannel:              receiveChannel,
			})
			if err != nil {
				return err
			}
			if pc == nil {
				return errors.New("Consumer is nil")
			}

132
			ms.consumerLock.Lock()
Y
yukun 已提交
133 134
			ms.consumers[channel] = pc
			ms.consumerChannels = append(ms.consumerChannels, channel)
135
			ms.consumerLock.Unlock()
Y
yukun 已提交
136 137
			return nil
		}
X
Xiangyu Wang 已提交
138
		err := Retry(20, time.Millisecond*200, fn)
Y
yukun 已提交
139 140 141 142 143 144 145
		if err != nil {
			errMsg := "Failed to create consumer " + channel + ", error = " + err.Error()
			panic(errMsg)
		}
	}
}

X
Xiangyu Wang 已提交
146
func (ms *mqMsgStream) SetRepackFunc(repackFunc RepackFunc) {
Y
yukun 已提交
147 148 149
	ms.repackFunc = repackFunc
}

X
Xiangyu Wang 已提交
150
func (ms *mqMsgStream) Start() {
G
godchen 已提交
151 152 153 154
	for _, c := range ms.consumers {
		ms.wait.Add(1)
		go ms.receiveMsg(c)
	}
Y
yukun 已提交
155 156
}

X
Xiangyu Wang 已提交
157
func (ms *mqMsgStream) Close() {
Y
yukun 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
	ms.streamCancel()
	ms.wait.Wait()

	for _, producer := range ms.producers {
		if producer != nil {
			producer.Close()
		}
	}
	for _, consumer := range ms.consumers {
		if consumer != nil {
			consumer.Close()
		}
	}
	if ms.client != nil {
		ms.client.Close()
	}
}

176
func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 {
Y
yukun 已提交
177 178 179 180
	if len(tsMsgs) <= 0 {
		return nil
	}
	reBucketValues := make([][]int32, len(tsMsgs))
181
	channelNum := uint32(len(ms.producerChannels))
182

183 184 185
	if channelNum == 0 {
		return nil
	}
Y
yukun 已提交
186 187 188 189
	for idx, tsMsg := range tsMsgs {
		hashValues := tsMsg.HashKeys()
		bucketValues := make([]int32, len(hashValues))
		for index, hashValue := range hashValues {
190
			bucketValues[index] = int32(hashValue % channelNum)
Y
yukun 已提交
191 192 193
		}
		reBucketValues[idx] = bucketValues
	}
194 195
	return reBucketValues
}
Y
yukun 已提交
196

197 198 199 200
func (ms *mqMsgStream) GetProduceChannels() []string {
	return ms.producerChannels
}

201
func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
202
	if msgPack == nil || len(msgPack.Msgs) <= 0 {
203 204 205 206 207 208
		log.Debug("Warning: Receive empty msgPack")
		return nil
	}
	if len(ms.producers) <= 0 {
		return errors.New("nil producer in msg stream")
	}
209
	tsMsgs := msgPack.Msgs
210
	reBucketValues := ms.ComputeProduceChannelIndexes(msgPack.Msgs)
Y
yukun 已提交
211 212 213 214 215 216 217 218
	var result map[int32]*MsgPack
	var err error
	if ms.repackFunc != nil {
		result, err = ms.repackFunc(tsMsgs, reBucketValues)
	} else {
		msgType := (tsMsgs[0]).Type()
		switch msgType {
		case commonpb.MsgType_Insert:
X
Xiangyu Wang 已提交
219
			result, err = InsertRepackFunc(tsMsgs, reBucketValues)
Y
yukun 已提交
220
		case commonpb.MsgType_Delete:
X
Xiangyu Wang 已提交
221
			result, err = DeleteRepackFunc(tsMsgs, reBucketValues)
Y
yukun 已提交
222
		default:
X
Xiangyu Wang 已提交
223
			result, err = DefaultRepackFunc(tsMsgs, reBucketValues)
Y
yukun 已提交
224 225 226 227 228 229 230 231
		}
	}
	if err != nil {
		return err
	}
	for k, v := range result {
		channel := ms.producerChannels[k]
		for i := 0; i < len(v.Msgs); i++ {
X
Xiangyu Wang 已提交
232
			sp, spanCtx := MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i])
X
Xiangyu Wang 已提交
233

Y
yukun 已提交
234 235 236 237 238
			mb, err := v.Msgs[i].Marshal(v.Msgs[i])
			if err != nil {
				return err
			}

X
Xiangyu Wang 已提交
239
			m, err := ConvertToByteArray(mb)
Y
yukun 已提交
240 241 242 243
			if err != nil {
				return err
			}

X
Xiangyu Wang 已提交
244
			msg := &mqclient.ProducerMessage{Payload: m, Properties: map[string]string{}}
Y
yukun 已提交
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261

			trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)

			if err := ms.producers[channel].Send(
				spanCtx,
				msg,
			); err != nil {
				trace.LogError(sp, err)
				sp.Finish()
				return err
			}
			sp.Finish()
		}
	}
	return nil
}

X
Xiangyu Wang 已提交
262
func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error {
263 264 265 266
	if msgPack == nil || len(msgPack.Msgs) <= 0 {
		log.Debug("Warning: Receive empty msgPack")
		return nil
	}
Y
yukun 已提交
267
	for _, v := range msgPack.Msgs {
X
Xiangyu Wang 已提交
268
		sp, spanCtx := MsgSpanFromCtx(v.TraceCtx(), v)
X
Xiangyu Wang 已提交
269

Y
yukun 已提交
270 271 272 273 274
		mb, err := v.Marshal(v)
		if err != nil {
			return err
		}

X
Xiangyu Wang 已提交
275
		m, err := ConvertToByteArray(mb)
Y
yukun 已提交
276 277 278 279
		if err != nil {
			return err
		}

X
Xiangyu Wang 已提交
280
		msg := &mqclient.ProducerMessage{Payload: m, Properties: map[string]string{}}
Y
yukun 已提交
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300

		trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)

		ms.producerLock.Lock()
		for _, producer := range ms.producers {
			if err := producer.Send(
				spanCtx,
				msg,
			); err != nil {
				trace.LogError(sp, err)
				sp.Finish()
				return err
			}
		}
		ms.producerLock.Unlock()
		sp.Finish()
	}
	return nil
}

X
Xiangyu Wang 已提交
301
func (ms *mqMsgStream) Consume() *MsgPack {
Y
yukun 已提交
302 303
	for {
		select {
304 305 306
		case <-ms.ctx.Done():
			//log.Debug("context closed")
			return nil
Y
yukun 已提交
307 308 309 310 311 312 313 314 315 316
		case cm, ok := <-ms.receiveBuf:
			if !ok {
				log.Debug("buf chan closed")
				return nil
			}
			return cm
		}
	}
}

317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
func (ms *mqMsgStream) getTsMsgFromConsumerMsg(msg mqclient.ConsumerMessage) (TsMsg, error) {
	header := commonpb.MsgHeader{}
	err := proto.Unmarshal(msg.Payload(), &header)
	if err != nil {
		return nil, fmt.Errorf("Failed to unmarshal message header, err %s", err.Error())
	}
	tsMsg, err := ms.unmarshal.Unmarshal(msg.Payload(), header.Base.MsgType)
	if err != nil {
		return nil, fmt.Errorf("Failed to unmarshal tsMsg, err %s", err.Error())
	}

	// set msg info to tsMsg
	tsMsg.SetPosition(&MsgPosition{
		ChannelName: filepath.Base(msg.Topic()),
		MsgID:       msg.ID().Serialize(),
	})

	return tsMsg, nil
}

X
Xiangyu Wang 已提交
337
func (ms *mqMsgStream) receiveMsg(consumer mqclient.Consumer) {
Y
yukun 已提交
338 339 340 341 342 343 344 345 346 347 348
	defer ms.wait.Done()

	for {
		select {
		case <-ms.ctx.Done():
			return
		case msg, ok := <-consumer.Chan():
			if !ok {
				return
			}
			consumer.Ack(msg)
349 350

			tsMsg, err := ms.getTsMsgFromConsumerMsg(msg)
Y
yukun 已提交
351
			if err != nil {
352
				log.Error("Failed to getTsMsgFromConsumerMsg", zap.Error(err))
Y
yukun 已提交
353 354
				continue
			}
N
neza2017 已提交
355 356 357 358 359 360 361
			pos := tsMsg.Position()
			tsMsg.SetPosition(&MsgPosition{
				ChannelName: pos.ChannelName,
				MsgID:       pos.MsgID,
				MsgGroup:    consumer.Subscription(),
				Timestamp:   tsMsg.BeginTs(),
			})
Y
yukun 已提交
362

X
Xiangyu Wang 已提交
363
			sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
X
Xiangyu Wang 已提交
364 365 366 367
			if ok {
				tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
			}

G
godchen 已提交
368 369 370 371 372
			msgPack := MsgPack{
				Msgs:           []TsMsg{tsMsg},
				StartPositions: []*internalpb.MsgPosition{tsMsg.Position()},
				EndPositions:   []*internalpb.MsgPosition{tsMsg.Position()},
			}
Y
yukun 已提交
373 374 375 376 377 378 379
			ms.receiveBuf <- &msgPack

			sp.Finish()
		}
	}
}

X
Xiangyu Wang 已提交
380
func (ms *mqMsgStream) Chan() <-chan *MsgPack {
Y
yukun 已提交
381 382 383
	return ms.receiveBuf
}

C
Cai Yudong 已提交
384 385 386 387 388 389
func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
	for _, mp := range msgPositions {
		consumer, ok := ms.consumers[mp.ChannelName]
		if !ok {
			return fmt.Errorf("channel %s not subscribed", mp.ChannelName)
		}
X
xige-16 已提交
390
		messageID, err := ms.client.BytesToMsgID(mp.MsgID)
Y
yukun 已提交
391 392 393 394 395 396 397
		if err != nil {
			return err
		}
		err = consumer.Seek(messageID)
		if err != nil {
			return err
		}
G
godchen 已提交
398 399 400 401 402 403 404 405 406 407 408 409
		msg, ok := <-consumer.Chan()
		if !ok {
			return errors.New("consumer closed")
		}
		consumer.Ack(msg)

		if !bytes.Equal(msg.ID().Serialize(), messageID.Serialize()) {
			err = fmt.Errorf("seek msg not correct")
			log.Error("msMsgStream seek", zap.Error(err))
		}

		return nil
Y
yukun 已提交
410
	}
C
Cai Yudong 已提交
411
	return nil
Y
yukun 已提交
412 413
}

X
Xiangyu Wang 已提交
414 415
type MqTtMsgStream struct {
	mqMsgStream
416 417 418 419 420 421 422 423 424
	chanMsgBuf         map[mqclient.Consumer][]TsMsg
	chanMsgPos         map[mqclient.Consumer]*internalpb.MsgPosition
	chanStopChan       map[mqclient.Consumer]chan bool
	chanTtMsgTime      map[mqclient.Consumer]Timestamp
	chanMsgBufMutex    *sync.Mutex
	chanTtMsgTimeMutex *sync.RWMutex
	chanWaitGroup      *sync.WaitGroup
	lastTimeStamp      Timestamp
	syncConsumer       chan int
Y
yukun 已提交
425 426
}

X
Xiangyu Wang 已提交
427
func NewMqTtMsgStream(ctx context.Context,
Y
yukun 已提交
428 429
	receiveBufSize int64,
	bufSize int64,
X
Xiangyu Wang 已提交
430 431 432
	client mqclient.Client,
	unmarshal UnmarshalDispatcher) (*MqTtMsgStream, error) {
	msgStream, err := NewMqMsgStream(ctx, receiveBufSize, bufSize, client, unmarshal)
Y
yukun 已提交
433 434 435
	if err != nil {
		return nil, err
	}
436 437 438 439
	chanMsgBuf := make(map[mqclient.Consumer][]TsMsg)
	chanMsgPos := make(map[mqclient.Consumer]*internalpb.MsgPosition)
	chanStopChan := make(map[mqclient.Consumer]chan bool)
	chanTtMsgTime := make(map[mqclient.Consumer]Timestamp)
Y
yukun 已提交
440 441
	syncConsumer := make(chan int, 1)

X
Xiangyu Wang 已提交
442
	return &MqTtMsgStream{
443 444 445 446 447 448 449 450 451
		mqMsgStream:        *msgStream,
		chanMsgBuf:         chanMsgBuf,
		chanMsgPos:         chanMsgPos,
		chanStopChan:       chanStopChan,
		chanTtMsgTime:      chanTtMsgTime,
		chanMsgBufMutex:    &sync.Mutex{},
		chanTtMsgTimeMutex: &sync.RWMutex{},
		chanWaitGroup:      &sync.WaitGroup{},
		syncConsumer:       syncConsumer,
Y
yukun 已提交
452 453 454
	}, nil
}

X
Xiangyu Wang 已提交
455
func (ms *MqTtMsgStream) addConsumer(consumer mqclient.Consumer, channel string) {
Y
yukun 已提交
456 457 458 459 460
	if len(ms.consumers) == 0 {
		ms.syncConsumer <- 1
	}
	ms.consumers[channel] = consumer
	ms.consumerChannels = append(ms.consumerChannels, channel)
461 462
	ms.chanMsgBuf[consumer] = make([]TsMsg, 0)
	ms.chanMsgPos[consumer] = &internalpb.MsgPosition{
Y
yukun 已提交
463
		ChannelName: channel,
X
xige-16 已提交
464
		MsgID:       make([]byte, 0),
Y
yukun 已提交
465 466
		Timestamp:   ms.lastTimeStamp,
	}
467 468
	ms.chanStopChan[consumer] = make(chan bool)
	ms.chanTtMsgTime[consumer] = 0
Y
yukun 已提交
469 470
}

471 472
// AsConsumer subscribes channels as consumer for a MsgStream
func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string) {
Y
yukun 已提交
473 474 475 476 477
	for _, channel := range channels {
		if _, ok := ms.consumers[channel]; ok {
			continue
		}
		fn := func() error {
X
Xiangyu Wang 已提交
478 479
			receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
			pc, err := ms.client.Subscribe(mqclient.ConsumerOptions{
Y
yukun 已提交
480 481
				Topic:                       channel,
				SubscriptionName:            subName,
X
Xiangyu Wang 已提交
482 483
				Type:                        mqclient.KeyShared,
				SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
Y
yukun 已提交
484 485 486 487 488 489 490 491 492 493 494 495 496 497
				MessageChannel:              receiveChannel,
			})
			if err != nil {
				return err
			}
			if pc == nil {
				return errors.New("Consumer is nil")
			}

			ms.consumerLock.Lock()
			ms.addConsumer(pc, channel)
			ms.consumerLock.Unlock()
			return nil
		}
X
Xiangyu Wang 已提交
498
		err := Retry(10, time.Millisecond*200, fn)
Y
yukun 已提交
499 500 501 502 503 504 505
		if err != nil {
			errMsg := "Failed to create consumer " + channel + ", error = " + err.Error()
			panic(errMsg)
		}
	}
}

X
Xiangyu Wang 已提交
506
func (ms *MqTtMsgStream) Start() {
Y
yukun 已提交
507 508 509 510 511 512
	if ms.consumers != nil {
		ms.wait.Add(1)
		go ms.bufMsgPackToChannel()
	}
}

X
Xiangyu Wang 已提交
513
func (ms *MqTtMsgStream) Close() {
Y
yukun 已提交
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532
	ms.streamCancel()
	close(ms.syncConsumer)
	ms.wait.Wait()

	for _, producer := range ms.producers {
		if producer != nil {
			producer.Close()
		}
	}
	for _, consumer := range ms.consumers {
		if consumer != nil {
			consumer.Close()
		}
	}
	if ms.client != nil {
		ms.client.Close()
	}
}

X
Xiangyu Wang 已提交
533
func (ms *MqTtMsgStream) bufMsgPackToChannel() {
Y
yukun 已提交
534
	defer ms.wait.Done()
535
	chanTtMsgSync := make(map[mqclient.Consumer]bool)
Y
yukun 已提交
536

537
	// block here until addConsumer
Y
yukun 已提交
538 539 540 541 542 543 544 545 546 547 548
	if _, ok := <-ms.syncConsumer; !ok {
		log.Debug("consumer closed!")
		return
	}

	for {
		select {
		case <-ms.ctx.Done():
			return
		default:
			ms.consumerLock.Lock()
549 550

			// wait all channels get ttMsg
Y
yukun 已提交
551
			for _, consumer := range ms.consumers {
552 553 554
				if !chanTtMsgSync[consumer] {
					ms.chanWaitGroup.Add(1)
					go ms.consumeToTtMsg(consumer)
Y
yukun 已提交
555 556
				}
			}
557 558 559 560 561
			ms.chanWaitGroup.Wait()

			// block here until all channels reach same timetick
			currTs, ok := ms.allChanReachSameTtMsg(chanTtMsgSync)
			if !ok || currTs <= ms.lastTimeStamp {
Y
yukun 已提交
562
				//log.Printf("All timeTick's timestamps are inconsistent")
563
				ms.consumerLock.Unlock()
Y
yukun 已提交
564 565
				continue
			}
566

Y
yukun 已提交
567 568 569
			timeTickBuf := make([]TsMsg, 0)
			startMsgPosition := make([]*internalpb.MsgPosition, 0)
			endMsgPositions := make([]*internalpb.MsgPosition, 0)
570 571
			ms.chanMsgBufMutex.Lock()
			for consumer, msgs := range ms.chanMsgBuf {
Y
yukun 已提交
572 573 574 575 576 577 578 579 580 581
				if len(msgs) == 0 {
					continue
				}
				tempBuffer := make([]TsMsg, 0)
				var timeTickMsg TsMsg
				for _, v := range msgs {
					if v.Type() == commonpb.MsgType_TimeTick {
						timeTickMsg = v
						continue
					}
582
					if v.EndTs() <= currTs {
Y
yukun 已提交
583
						timeTickBuf = append(timeTickBuf, v)
584
						//log.Debug("pack msg", zap.Uint64("curr", v.EndTs()), zap.Uint64("currTs", currTs))
Y
yukun 已提交
585 586 587 588
					} else {
						tempBuffer = append(tempBuffer, v)
					}
				}
589
				ms.chanMsgBuf[consumer] = tempBuffer
Y
yukun 已提交
590

591
				startMsgPosition = append(startMsgPosition, ms.chanMsgPos[consumer])
Y
yukun 已提交
592 593
				var newPos *internalpb.MsgPosition
				if len(tempBuffer) > 0 {
594
					// if tempBuffer is not empty, use tempBuffer[0] to seek
Y
yukun 已提交
595 596 597
					newPos = &internalpb.MsgPosition{
						ChannelName: tempBuffer[0].Position().ChannelName,
						MsgID:       tempBuffer[0].Position().MsgID,
598
						Timestamp:   currTs,
599
						MsgGroup:    consumer.Subscription(),
Y
yukun 已提交
600 601
					}
					endMsgPositions = append(endMsgPositions, newPos)
602 603
				} else if timeTickMsg != nil {
					// if tempBuffer is empty, use timeTickMsg to seek
Y
yukun 已提交
604 605 606
					newPos = &internalpb.MsgPosition{
						ChannelName: timeTickMsg.Position().ChannelName,
						MsgID:       timeTickMsg.Position().MsgID,
607
						Timestamp:   currTs,
608
						MsgGroup:    consumer.Subscription(),
Y
yukun 已提交
609 610 611
					}
					endMsgPositions = append(endMsgPositions, newPos)
				}
612
				ms.chanMsgPos[consumer] = newPos
Y
yukun 已提交
613
			}
614
			ms.chanMsgBufMutex.Unlock()
615
			ms.consumerLock.Unlock()
Y
yukun 已提交
616 617 618

			msgPack := MsgPack{
				BeginTs:        ms.lastTimeStamp,
619
				EndTs:          currTs,
Y
yukun 已提交
620 621 622 623 624
				Msgs:           timeTickBuf,
				StartPositions: startMsgPosition,
				EndPositions:   endMsgPositions,
			}

625
			//log.Debug("send msg pack", zap.Int("len", len(msgPack.Msgs)), zap.Uint64("currTs", currTs))
Y
yukun 已提交
626
			ms.receiveBuf <- &msgPack
627
			ms.lastTimeStamp = currTs
Y
yukun 已提交
628 629 630 631
		}
	}
}

632 633 634
// Save all msgs into chanMsgBuf[] till receive one ttMsg
func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqclient.Consumer) {
	defer ms.chanWaitGroup.Done()
Y
yukun 已提交
635 636 637 638
	for {
		select {
		case <-ms.ctx.Done():
			return
639 640
		case <-ms.chanStopChan[consumer]:
			return
Y
yukun 已提交
641 642 643 644 645 646 647
		case msg, ok := <-consumer.Chan():
			if !ok {
				log.Debug("consumer closed!")
				return
			}
			consumer.Ack(msg)

648
			tsMsg, err := ms.getTsMsgFromConsumerMsg(msg)
Y
yukun 已提交
649
			if err != nil {
650
				log.Error("Failed to getTsMsgFromConsumerMsg", zap.Error(err))
Y
yukun 已提交
651 652 653
				continue
			}

X
Xiangyu Wang 已提交
654
			sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
Y
yukun 已提交
655
			if ok {
X
Xiangyu Wang 已提交
656
				tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
Y
yukun 已提交
657 658
			}

659 660 661
			ms.chanMsgBufMutex.Lock()
			ms.chanMsgBuf[consumer] = append(ms.chanMsgBuf[consumer], tsMsg)
			ms.chanMsgBufMutex.Unlock()
Y
yukun 已提交
662

663 664 665 666
			if tsMsg.Type() == commonpb.MsgType_TimeTick {
				ms.chanTtMsgTimeMutex.Lock()
				ms.chanTtMsgTime[consumer] = tsMsg.(*TimeTickMsg).Base.Timestamp
				ms.chanTtMsgTimeMutex.Unlock()
Y
yukun 已提交
667 668 669 670 671 672 673 674
				sp.Finish()
				return
			}
			sp.Finish()
		}
	}
}

675 676 677
// return true only when all channels reach same timetick
func (ms *MqTtMsgStream) allChanReachSameTtMsg(chanTtMsgSync map[mqclient.Consumer]bool) (Timestamp, bool) {
	tsMap := make(map[Timestamp]int)
Y
yukun 已提交
678
	var maxTime Timestamp = 0
679 680 681 682
	for _, t := range ms.chanTtMsgTime {
		tsMap[t]++
		if t > maxTime {
			maxTime = t
Y
yukun 已提交
683 684
		}
	}
685 686 687 688
	// when all channels reach same timetick, timeMap should contain only 1 timestamp
	if len(tsMap) <= 1 {
		for consumer := range ms.chanTtMsgTime {
			chanTtMsgSync[consumer] = false
Y
yukun 已提交
689 690 691
		}
		return maxTime, true
	}
692 693 694 695
	for consumer := range ms.chanTtMsgTime {
		ms.chanTtMsgTimeMutex.RLock()
		chanTtMsgSync[consumer] = (ms.chanTtMsgTime[consumer] == maxTime)
		ms.chanTtMsgTimeMutex.RUnlock()
Y
yukun 已提交
696 697 698 699 700
	}

	return 0, false
}

C
Cai Yudong 已提交
701 702
// Seek to the specified position
func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
X
Xiangyu Wang 已提交
703
	var consumer mqclient.Consumer
C
Cai Yudong 已提交
704
	var mp *MsgPosition
Y
yukun 已提交
705
	var err error
X
xige-16 已提交
706
	fn := func() error {
N
neza2017 已提交
707 708 709 710
		var ok bool
		consumer, ok = ms.consumers[mp.ChannelName]
		if !ok {
			return fmt.Errorf("please subcribe the channel, channel name =%s", mp.ChannelName)
X
xige-16 已提交
711
		}
N
neza2017 已提交
712

X
xige-16 已提交
713
		if consumer == nil {
C
Cai Yudong 已提交
714
			return fmt.Errorf("consumer is nil")
X
xige-16 已提交
715
		}
Y
yukun 已提交
716

X
xige-16 已提交
717 718 719 720 721 722 723 724 725 726 727
		seekMsgID, err := ms.client.BytesToMsgID(mp.MsgID)
		if err != nil {
			return err
		}
		err = consumer.Seek(seekMsgID)
		if err != nil {
			return err
		}

		return nil
	}
Y
yukun 已提交
728

C
Cai Yudong 已提交
729 730
	ms.consumerLock.Lock()
	defer ms.consumerLock.Unlock()
Y
yukun 已提交
731

C
Cai Yudong 已提交
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774
	for idx := range msgPositions {
		mp = msgPositions[idx]
		if len(mp.MsgID) == 0 {
			return fmt.Errorf("when msgID's length equal to 0, please use AsConsumer interface")
		}
		if err = Retry(20, time.Millisecond*200, fn); err != nil {
			return fmt.Errorf("Failed to seek, error %s", err.Error())
		}
		ms.addConsumer(consumer, mp.ChannelName)

		//TODO: May cause problem
		//if len(consumer.Chan()) == 0 {
		//	return nil
		//}

		runLoop := true
		for runLoop {
			select {
			case <-ms.ctx.Done():
				return nil
			case msg, ok := <-consumer.Chan():
				if !ok {
					return fmt.Errorf("consumer closed")
				}
				consumer.Ack(msg)

				headerMsg := commonpb.MsgHeader{}
				err := proto.Unmarshal(msg.Payload(), &headerMsg)
				if err != nil {
					return fmt.Errorf("Failed to unmarshal message header, err %s", err.Error())
				}
				tsMsg, err := ms.unmarshal.Unmarshal(msg.Payload(), headerMsg.Base.MsgType)
				if err != nil {
					return fmt.Errorf("Failed to unmarshal tsMsg, err %s", err.Error())
				}
				if tsMsg.Type() == commonpb.MsgType_TimeTick && tsMsg.BeginTs() >= mp.Timestamp {
					runLoop = false
					break
				} else if tsMsg.BeginTs() > mp.Timestamp {
					tsMsg.SetPosition(&MsgPosition{
						ChannelName: filepath.Base(msg.Topic()),
						MsgID:       msg.ID().Serialize(),
					})
775
					ms.chanMsgBuf[consumer] = append(ms.chanMsgBuf[consumer], tsMsg)
Y
yukun 已提交
776 777 778 779
				}
			}
		}
	}
C
Cai Yudong 已提交
780
	return nil
Y
yukun 已提交
781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
}

//TODO test InMemMsgStream
/*
type InMemMsgStream struct {
	buffer chan *MsgPack
}

func (ms *InMemMsgStream) Start() {}
func (ms *InMemMsgStream) Close() {}

func (ms *InMemMsgStream) ProduceOne(msg TsMsg) error {
	msgPack := MsgPack{}
	msgPack.BeginTs = msg.BeginTs()
	msgPack.EndTs = msg.EndTs()
	msgPack.Msgs = append(msgPack.Msgs, msg)
	buffer <- &msgPack
	return nil
}

func (ms *InMemMsgStream) Produce(msgPack *MsgPack) error {
	buffer <- msgPack
	return nil
}

func (ms *InMemMsgStream) Broadcast(msgPack *MsgPack) error {
	return ms.Produce(msgPack)
}

func (ms *InMemMsgStream) Consume() *MsgPack {
	select {
	case msgPack := <-ms.buffer:
		return msgPack
	}
}

func (ms *InMemMsgStream) Chan() <- chan *MsgPack {
	return buffer
}
*/