mq_msgstream.go 19.4 KB
Newer Older
X
xige-16 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
Xiangyu Wang 已提交
12
package msgstream
Y
yukun 已提交
13 14 15 16 17 18 19 20 21

import (
	"context"
	"errors"
	"path/filepath"
	"sync"
	"time"

	"github.com/golang/protobuf/proto"
X
Xiangyu Wang 已提交
22 23 24 25 26
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/util/mqclient"
	"github.com/milvus-io/milvus/internal/util/trace"
X
Xiangyu Wang 已提交
27
	"github.com/opentracing/opentracing-go"
Y
yukun 已提交
28 29 30
	"go.uber.org/zap"
)

X
Xiangyu Wang 已提交
31
type mqMsgStream struct {
Y
yukun 已提交
32
	ctx              context.Context
X
Xiangyu Wang 已提交
33 34
	client           mqclient.Client
	producers        map[string]mqclient.Producer
Y
yukun 已提交
35
	producerChannels []string
X
Xiangyu Wang 已提交
36
	consumers        map[string]mqclient.Consumer
Y
yukun 已提交
37 38 39 40 41 42 43 44 45 46 47
	consumerChannels []string
	repackFunc       RepackFunc
	unmarshal        UnmarshalDispatcher
	receiveBuf       chan *MsgPack
	wait             *sync.WaitGroup
	streamCancel     func()
	bufSize          int64
	producerLock     *sync.Mutex
	consumerLock     *sync.Mutex
}

X
Xiangyu Wang 已提交
48
func NewMqMsgStream(ctx context.Context,
Y
yukun 已提交
49 50
	receiveBufSize int64,
	bufSize int64,
X
Xiangyu Wang 已提交
51 52
	client mqclient.Client,
	unmarshal UnmarshalDispatcher) (*mqMsgStream, error) {
Y
yukun 已提交
53 54

	streamCtx, streamCancel := context.WithCancel(ctx)
X
Xiangyu Wang 已提交
55 56
	producers := make(map[string]mqclient.Producer)
	consumers := make(map[string]mqclient.Consumer)
Y
yukun 已提交
57 58 59 60
	producerChannels := make([]string, 0)
	consumerChannels := make([]string, 0)
	receiveBuf := make(chan *MsgPack, receiveBufSize)

X
Xiangyu Wang 已提交
61
	stream := &mqMsgStream{
Y
yukun 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
		ctx:              streamCtx,
		client:           client,
		producers:        producers,
		producerChannels: producerChannels,
		consumers:        consumers,
		consumerChannels: consumerChannels,
		unmarshal:        unmarshal,
		bufSize:          bufSize,
		receiveBuf:       receiveBuf,
		streamCancel:     streamCancel,
		producerLock:     &sync.Mutex{},
		consumerLock:     &sync.Mutex{},
		wait:             &sync.WaitGroup{},
	}

	return stream, nil
}

X
Xiangyu Wang 已提交
80
func (ms *mqMsgStream) AsProducer(channels []string) {
Y
yukun 已提交
81 82
	for _, channel := range channels {
		fn := func() error {
X
Xiangyu Wang 已提交
83
			pp, err := ms.client.CreateProducer(mqclient.ProducerOptions{Topic: channel})
Y
yukun 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96
			if err != nil {
				return err
			}
			if pp == nil {
				return errors.New("Producer is nil")
			}

			ms.producerLock.Lock()
			ms.producers[channel] = pp
			ms.producerChannels = append(ms.producerChannels, channel)
			ms.producerLock.Unlock()
			return nil
		}
X
Xiangyu Wang 已提交
97
		err := Retry(20, time.Millisecond*200, fn)
Y
yukun 已提交
98 99 100 101 102 103 104
		if err != nil {
			errMsg := "Failed to create producer " + channel + ", error = " + err.Error()
			panic(errMsg)
		}
	}
}

X
Xiangyu Wang 已提交
105
func (ms *mqMsgStream) AsConsumer(channels []string,
Y
yukun 已提交
106 107 108 109 110 111
	subName string) {
	for _, channel := range channels {
		if _, ok := ms.consumers[channel]; ok {
			continue
		}
		fn := func() error {
X
Xiangyu Wang 已提交
112 113
			receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
			pc, err := ms.client.Subscribe(mqclient.ConsumerOptions{
Y
yukun 已提交
114 115
				Topic:                       channel,
				SubscriptionName:            subName,
X
Xiangyu Wang 已提交
116 117
				Type:                        mqclient.KeyShared,
				SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
Y
yukun 已提交
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
				MessageChannel:              receiveChannel,
			})
			if err != nil {
				return err
			}
			if pc == nil {
				return errors.New("Consumer is nil")
			}

			ms.consumers[channel] = pc
			ms.consumerChannels = append(ms.consumerChannels, channel)
			ms.wait.Add(1)
			go ms.receiveMsg(pc)
			return nil
		}
X
Xiangyu Wang 已提交
133
		err := Retry(20, time.Millisecond*200, fn)
Y
yukun 已提交
134 135 136 137 138 139 140
		if err != nil {
			errMsg := "Failed to create consumer " + channel + ", error = " + err.Error()
			panic(errMsg)
		}
	}
}

X
Xiangyu Wang 已提交
141
func (ms *mqMsgStream) SetRepackFunc(repackFunc RepackFunc) {
Y
yukun 已提交
142 143 144
	ms.repackFunc = repackFunc
}

X
Xiangyu Wang 已提交
145
func (ms *mqMsgStream) Start() {
Y
yukun 已提交
146 147
}

X
Xiangyu Wang 已提交
148
func (ms *mqMsgStream) Close() {
Y
yukun 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
	ms.streamCancel()
	ms.wait.Wait()

	for _, producer := range ms.producers {
		if producer != nil {
			producer.Close()
		}
	}
	for _, consumer := range ms.consumers {
		if consumer != nil {
			consumer.Close()
		}
	}
	if ms.client != nil {
		ms.client.Close()
	}
}

167
func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 {
Y
yukun 已提交
168 169 170 171
	if len(tsMsgs) <= 0 {
		return nil
	}
	reBucketValues := make([][]int32, len(tsMsgs))
172
	channelNum := uint32(len(ms.producerChannels))
173

174 175 176
	if channelNum == 0 {
		return nil
	}
Y
yukun 已提交
177 178 179 180
	for idx, tsMsg := range tsMsgs {
		hashValues := tsMsg.HashKeys()
		bucketValues := make([]int32, len(hashValues))
		for index, hashValue := range hashValues {
181
			bucketValues[index] = int32(hashValue % channelNum)
Y
yukun 已提交
182 183 184
		}
		reBucketValues[idx] = bucketValues
	}
185 186
	return reBucketValues
}
Y
yukun 已提交
187

188 189 190 191
func (ms *mqMsgStream) GetProduceChannels() []string {
	return ms.producerChannels
}

192 193 194 195 196 197 198 199 200 201
func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
	tsMsgs := msgPack.Msgs
	if len(tsMsgs) <= 0 {
		log.Debug("Warning: Receive empty msgPack")
		return nil
	}
	if len(ms.producers) <= 0 {
		return errors.New("nil producer in msg stream")
	}
	reBucketValues := ms.ComputeProduceChannelIndexes(msgPack.Msgs)
Y
yukun 已提交
202 203 204 205 206 207 208 209
	var result map[int32]*MsgPack
	var err error
	if ms.repackFunc != nil {
		result, err = ms.repackFunc(tsMsgs, reBucketValues)
	} else {
		msgType := (tsMsgs[0]).Type()
		switch msgType {
		case commonpb.MsgType_Insert:
X
Xiangyu Wang 已提交
210
			result, err = InsertRepackFunc(tsMsgs, reBucketValues)
Y
yukun 已提交
211
		case commonpb.MsgType_Delete:
X
Xiangyu Wang 已提交
212
			result, err = DeleteRepackFunc(tsMsgs, reBucketValues)
Y
yukun 已提交
213
		default:
X
Xiangyu Wang 已提交
214
			result, err = DefaultRepackFunc(tsMsgs, reBucketValues)
Y
yukun 已提交
215 216 217 218 219 220 221 222
		}
	}
	if err != nil {
		return err
	}
	for k, v := range result {
		channel := ms.producerChannels[k]
		for i := 0; i < len(v.Msgs); i++ {
X
Xiangyu Wang 已提交
223
			sp, spanCtx := MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i])
X
Xiangyu Wang 已提交
224

Y
yukun 已提交
225 226 227 228 229
			mb, err := v.Msgs[i].Marshal(v.Msgs[i])
			if err != nil {
				return err
			}

X
Xiangyu Wang 已提交
230
			m, err := ConvertToByteArray(mb)
Y
yukun 已提交
231 232 233 234
			if err != nil {
				return err
			}

X
Xiangyu Wang 已提交
235
			msg := &mqclient.ProducerMessage{Payload: m, Properties: map[string]string{}}
Y
yukun 已提交
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252

			trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)

			if err := ms.producers[channel].Send(
				spanCtx,
				msg,
			); err != nil {
				trace.LogError(sp, err)
				sp.Finish()
				return err
			}
			sp.Finish()
		}
	}
	return nil
}

X
Xiangyu Wang 已提交
253
func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error {
Y
yukun 已提交
254
	for _, v := range msgPack.Msgs {
X
Xiangyu Wang 已提交
255
		sp, spanCtx := MsgSpanFromCtx(v.TraceCtx(), v)
X
Xiangyu Wang 已提交
256

Y
yukun 已提交
257 258 259 260 261
		mb, err := v.Marshal(v)
		if err != nil {
			return err
		}

X
Xiangyu Wang 已提交
262
		m, err := ConvertToByteArray(mb)
Y
yukun 已提交
263 264 265 266
		if err != nil {
			return err
		}

X
Xiangyu Wang 已提交
267
		msg := &mqclient.ProducerMessage{Payload: m, Properties: map[string]string{}}
Y
yukun 已提交
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287

		trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)

		ms.producerLock.Lock()
		for _, producer := range ms.producers {
			if err := producer.Send(
				spanCtx,
				msg,
			); err != nil {
				trace.LogError(sp, err)
				sp.Finish()
				return err
			}
		}
		ms.producerLock.Unlock()
		sp.Finish()
	}
	return nil
}

X
Xiangyu Wang 已提交
288
func (ms *mqMsgStream) Consume() *MsgPack {
Y
yukun 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
	for {
		select {
		case cm, ok := <-ms.receiveBuf:
			if !ok {
				log.Debug("buf chan closed")
				return nil
			}
			return cm
		case <-ms.ctx.Done():
			//log.Debug("context closed")
			return nil
		}
	}
}

X
Xiangyu Wang 已提交
304
func (ms *mqMsgStream) receiveMsg(consumer mqclient.Consumer) {
Y
yukun 已提交
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
	defer ms.wait.Done()

	for {
		select {
		case <-ms.ctx.Done():
			return
		case msg, ok := <-consumer.Chan():
			if !ok {
				return
			}
			consumer.Ack(msg)
			headerMsg := commonpb.MsgHeader{}
			err := proto.Unmarshal(msg.Payload(), &headerMsg)
			if err != nil {
				log.Error("Failed to unmarshal message header", zap.Error(err))
				continue
			}
			tsMsg, err := ms.unmarshal.Unmarshal(msg.Payload(), headerMsg.Base.MsgType)
			if err != nil {
				log.Error("Failed to unmarshal tsMsg", zap.Error(err))
				continue
			}

X
Xiangyu Wang 已提交
328
			sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
X
Xiangyu Wang 已提交
329 330 331 332
			if ok {
				tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
			}

X
Xiangyu Wang 已提交
333
			tsMsg.SetPosition(&MsgPosition{
Y
yukun 已提交
334 335
				ChannelName: filepath.Base(msg.Topic()),
				//FIXME
X
xige-16 已提交
336
				MsgID: msg.ID().Serialize(),
Y
yukun 已提交
337 338 339 340 341 342 343 344 345 346
			})

			msgPack := MsgPack{Msgs: []TsMsg{tsMsg}}
			ms.receiveBuf <- &msgPack

			sp.Finish()
		}
	}
}

X
Xiangyu Wang 已提交
347
func (ms *mqMsgStream) Chan() <-chan *MsgPack {
Y
yukun 已提交
348 349 350
	return ms.receiveBuf
}

X
Xiangyu Wang 已提交
351
func (ms *mqMsgStream) Seek(mp *internalpb.MsgPosition) error {
Y
yukun 已提交
352 353
	if _, ok := ms.consumers[mp.ChannelName]; ok {
		consumer := ms.consumers[mp.ChannelName]
X
xige-16 已提交
354
		messageID, err := ms.client.BytesToMsgID(mp.MsgID)
Y
yukun 已提交
355 356 357 358 359 360 361 362 363 364 365 366 367
		if err != nil {
			return err
		}
		err = consumer.Seek(messageID)
		if err != nil {
			return err
		}
		return nil
	}

	return errors.New("msgStream seek fail")
}

X
Xiangyu Wang 已提交
368 369 370 371
type MqTtMsgStream struct {
	mqMsgStream
	unsolvedBuf     map[mqclient.Consumer][]TsMsg
	msgPositions    map[mqclient.Consumer]*internalpb.MsgPosition
Y
yukun 已提交
372 373 374
	unsolvedMutex   *sync.Mutex
	lastTimeStamp   Timestamp
	syncConsumer    chan int
X
Xiangyu Wang 已提交
375
	stopConsumeChan map[mqclient.Consumer]chan bool
Y
yukun 已提交
376 377
}

X
Xiangyu Wang 已提交
378
func NewMqTtMsgStream(ctx context.Context,
Y
yukun 已提交
379 380
	receiveBufSize int64,
	bufSize int64,
X
Xiangyu Wang 已提交
381 382 383
	client mqclient.Client,
	unmarshal UnmarshalDispatcher) (*MqTtMsgStream, error) {
	msgStream, err := NewMqMsgStream(ctx, receiveBufSize, bufSize, client, unmarshal)
Y
yukun 已提交
384 385 386
	if err != nil {
		return nil, err
	}
X
Xiangyu Wang 已提交
387 388 389
	unsolvedBuf := make(map[mqclient.Consumer][]TsMsg)
	stopChannel := make(map[mqclient.Consumer]chan bool)
	msgPositions := make(map[mqclient.Consumer]*internalpb.MsgPosition)
Y
yukun 已提交
390 391
	syncConsumer := make(chan int, 1)

X
Xiangyu Wang 已提交
392 393
	return &MqTtMsgStream{
		mqMsgStream:     *msgStream,
Y
yukun 已提交
394 395 396 397 398 399 400 401
		unsolvedBuf:     unsolvedBuf,
		msgPositions:    msgPositions,
		unsolvedMutex:   &sync.Mutex{},
		syncConsumer:    syncConsumer,
		stopConsumeChan: stopChannel,
	}, nil
}

X
Xiangyu Wang 已提交
402
func (ms *MqTtMsgStream) addConsumer(consumer mqclient.Consumer, channel string) {
Y
yukun 已提交
403 404 405 406 407 408 409 410
	if len(ms.consumers) == 0 {
		ms.syncConsumer <- 1
	}
	ms.consumers[channel] = consumer
	ms.unsolvedBuf[consumer] = make([]TsMsg, 0)
	ms.consumerChannels = append(ms.consumerChannels, channel)
	ms.msgPositions[consumer] = &internalpb.MsgPosition{
		ChannelName: channel,
X
xige-16 已提交
411
		MsgID:       make([]byte, 0),
Y
yukun 已提交
412 413 414 415 416 417
		Timestamp:   ms.lastTimeStamp,
	}
	stopConsumeChan := make(chan bool)
	ms.stopConsumeChan[consumer] = stopConsumeChan
}

X
Xiangyu Wang 已提交
418
func (ms *MqTtMsgStream) AsConsumer(channels []string,
Y
yukun 已提交
419 420 421 422 423 424
	subName string) {
	for _, channel := range channels {
		if _, ok := ms.consumers[channel]; ok {
			continue
		}
		fn := func() error {
X
Xiangyu Wang 已提交
425 426
			receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
			pc, err := ms.client.Subscribe(mqclient.ConsumerOptions{
Y
yukun 已提交
427 428
				Topic:                       channel,
				SubscriptionName:            subName,
X
Xiangyu Wang 已提交
429 430
				Type:                        mqclient.KeyShared,
				SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
Y
yukun 已提交
431 432 433 434 435 436 437 438 439 440 441 442 443 444
				MessageChannel:              receiveChannel,
			})
			if err != nil {
				return err
			}
			if pc == nil {
				return errors.New("Consumer is nil")
			}

			ms.consumerLock.Lock()
			ms.addConsumer(pc, channel)
			ms.consumerLock.Unlock()
			return nil
		}
X
Xiangyu Wang 已提交
445
		err := Retry(10, time.Millisecond*200, fn)
Y
yukun 已提交
446 447 448 449 450 451 452
		if err != nil {
			errMsg := "Failed to create consumer " + channel + ", error = " + err.Error()
			panic(errMsg)
		}
	}
}

X
Xiangyu Wang 已提交
453
func (ms *MqTtMsgStream) Start() {
Y
yukun 已提交
454 455 456 457 458 459
	if ms.consumers != nil {
		ms.wait.Add(1)
		go ms.bufMsgPackToChannel()
	}
}

X
Xiangyu Wang 已提交
460
func (ms *MqTtMsgStream) Close() {
Y
yukun 已提交
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
	ms.streamCancel()
	close(ms.syncConsumer)
	ms.wait.Wait()

	for _, producer := range ms.producers {
		if producer != nil {
			producer.Close()
		}
	}
	for _, consumer := range ms.consumers {
		if consumer != nil {
			consumer.Close()
		}
	}
	if ms.client != nil {
		ms.client.Close()
	}
}

X
Xiangyu Wang 已提交
480
func (ms *MqTtMsgStream) bufMsgPackToChannel() {
Y
yukun 已提交
481
	defer ms.wait.Done()
X
Xiangyu Wang 已提交
482 483 484
	ms.unsolvedBuf = make(map[mqclient.Consumer][]TsMsg)
	isChannelReady := make(map[mqclient.Consumer]bool)
	eofMsgTimeStamp := make(map[mqclient.Consumer]Timestamp)
Y
yukun 已提交
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570

	if _, ok := <-ms.syncConsumer; !ok {
		log.Debug("consumer closed!")
		return
	}

	for {
		select {
		case <-ms.ctx.Done():
			return
		default:
			wg := sync.WaitGroup{}
			findMapMutex := sync.RWMutex{}
			ms.consumerLock.Lock()
			for _, consumer := range ms.consumers {
				if isChannelReady[consumer] {
					continue
				}
				wg.Add(1)
				go ms.findTimeTick(consumer, eofMsgTimeStamp, &wg, &findMapMutex)
			}
			ms.consumerLock.Unlock()
			wg.Wait()
			timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp, isChannelReady, &findMapMutex)
			if !ok || timeStamp <= ms.lastTimeStamp {
				//log.Printf("All timeTick's timestamps are inconsistent")
				continue
			}
			timeTickBuf := make([]TsMsg, 0)
			startMsgPosition := make([]*internalpb.MsgPosition, 0)
			endMsgPositions := make([]*internalpb.MsgPosition, 0)
			ms.unsolvedMutex.Lock()
			for consumer, msgs := range ms.unsolvedBuf {
				if len(msgs) == 0 {
					continue
				}
				tempBuffer := make([]TsMsg, 0)
				var timeTickMsg TsMsg
				for _, v := range msgs {
					if v.Type() == commonpb.MsgType_TimeTick {
						timeTickMsg = v
						continue
					}
					if v.EndTs() <= timeStamp {
						timeTickBuf = append(timeTickBuf, v)
					} else {
						tempBuffer = append(tempBuffer, v)
					}
				}
				ms.unsolvedBuf[consumer] = tempBuffer

				startMsgPosition = append(startMsgPosition, ms.msgPositions[consumer])
				var newPos *internalpb.MsgPosition
				if len(tempBuffer) > 0 {
					newPos = &internalpb.MsgPosition{
						ChannelName: tempBuffer[0].Position().ChannelName,
						MsgID:       tempBuffer[0].Position().MsgID,
						Timestamp:   timeStamp,
					}
					endMsgPositions = append(endMsgPositions, newPos)
				} else {
					newPos = &internalpb.MsgPosition{
						ChannelName: timeTickMsg.Position().ChannelName,
						MsgID:       timeTickMsg.Position().MsgID,
						Timestamp:   timeStamp,
					}
					endMsgPositions = append(endMsgPositions, newPos)
				}
				ms.msgPositions[consumer] = newPos
			}
			ms.unsolvedMutex.Unlock()

			msgPack := MsgPack{
				BeginTs:        ms.lastTimeStamp,
				EndTs:          timeStamp,
				Msgs:           timeTickBuf,
				StartPositions: startMsgPosition,
				EndPositions:   endMsgPositions,
			}

			ms.receiveBuf <- &msgPack
			ms.lastTimeStamp = timeStamp
		}
	}
}

X
Xiangyu Wang 已提交
571 572
func (ms *MqTtMsgStream) findTimeTick(consumer mqclient.Consumer,
	eofMsgMap map[mqclient.Consumer]Timestamp,
Y
yukun 已提交
573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599
	wg *sync.WaitGroup,
	findMapMutex *sync.RWMutex) {
	defer wg.Done()
	for {
		select {
		case <-ms.ctx.Done():
			return
		case msg, ok := <-consumer.Chan():
			if !ok {
				log.Debug("consumer closed!")
				return
			}
			consumer.Ack(msg)

			headerMsg := commonpb.MsgHeader{}
			err := proto.Unmarshal(msg.Payload(), &headerMsg)
			if err != nil {
				log.Error("Failed to unmarshal message header", zap.Error(err))
				continue
			}
			tsMsg, err := ms.unmarshal.Unmarshal(msg.Payload(), headerMsg.Base.MsgType)
			if err != nil {
				log.Error("Failed to unmarshal tsMsg", zap.Error(err))
				continue
			}

			// set msg info to tsMsg
X
Xiangyu Wang 已提交
600
			tsMsg.SetPosition(&MsgPosition{
Y
yukun 已提交
601
				ChannelName: filepath.Base(msg.Topic()),
X
xige-16 已提交
602
				MsgID:       msg.ID().Serialize(),
Y
yukun 已提交
603 604
			})

X
Xiangyu Wang 已提交
605
			sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
Y
yukun 已提交
606
			if ok {
X
Xiangyu Wang 已提交
607
				tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
Y
yukun 已提交
608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627
			}

			ms.unsolvedMutex.Lock()
			ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg)
			ms.unsolvedMutex.Unlock()

			if headerMsg.Base.MsgType == commonpb.MsgType_TimeTick {
				findMapMutex.Lock()
				eofMsgMap[consumer] = tsMsg.(*TimeTickMsg).Base.Timestamp
				findMapMutex.Unlock()
				sp.Finish()
				return
			}
			sp.Finish()
		case <-ms.stopConsumeChan[consumer]:
			return
		}
	}
}

X
Xiangyu Wang 已提交
628 629
func checkTimeTickMsg(msg map[mqclient.Consumer]Timestamp,
	isChannelReady map[mqclient.Consumer]bool,
Y
yukun 已提交
630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658
	mu *sync.RWMutex) (Timestamp, bool) {
	checkMap := make(map[Timestamp]int)
	var maxTime Timestamp = 0
	for _, v := range msg {
		checkMap[v]++
		if v > maxTime {
			maxTime = v
		}
	}
	if len(checkMap) <= 1 {
		for consumer := range msg {
			isChannelReady[consumer] = false
		}
		return maxTime, true
	}
	for consumer := range msg {
		mu.RLock()
		v := msg[consumer]
		mu.RUnlock()
		if v != maxTime {
			isChannelReady[consumer] = false
		} else {
			isChannelReady[consumer] = true
		}
	}

	return 0, false
}

X
Xiangyu Wang 已提交
659
func (ms *MqTtMsgStream) Seek(mp *internalpb.MsgPosition) error {
Y
yukun 已提交
660 661 662
	if len(mp.MsgID) == 0 {
		return errors.New("when msgID's length equal to 0, please use AsConsumer interface")
	}
X
Xiangyu Wang 已提交
663
	var consumer mqclient.Consumer
Y
yukun 已提交
664 665 666 667 668 669 670 671 672
	var err error
	var hasWatched bool
	seekChannel := mp.ChannelName
	subName := mp.MsgGroup
	ms.consumerLock.Lock()
	defer ms.consumerLock.Unlock()
	consumer, hasWatched = ms.consumers[seekChannel]

	if hasWatched {
673
		return errors.New("the channel should has not been subscribed")
Y
yukun 已提交
674 675
	}

X
xige-16 已提交
676
	fn := func() error {
X
Xiangyu Wang 已提交
677 678
		receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
		consumer, err = ms.client.Subscribe(mqclient.ConsumerOptions{
X
xige-16 已提交
679 680
			Topic:                       seekChannel,
			SubscriptionName:            subName,
X
Xiangyu Wang 已提交
681 682
			SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
			Type:                        mqclient.KeyShared,
X
xige-16 已提交
683 684 685 686 687 688 689 690 691 692
			MessageChannel:              receiveChannel,
		})
		if err != nil {
			return err
		}
		if consumer == nil {
			err = errors.New("consumer is nil")
			log.Debug("subscribe error", zap.String("error = ", err.Error()))
			return err
		}
Y
yukun 已提交
693

X
xige-16 已提交
694 695 696 697 698 699 700 701 702 703 704 705 706
		seekMsgID, err := ms.client.BytesToMsgID(mp.MsgID)
		if err != nil {
			log.Debug("convert messageID error", zap.String("error = ", err.Error()))
			return err
		}
		err = consumer.Seek(seekMsgID)
		if err != nil {
			log.Debug("seek error ", zap.String("error = ", err.Error()))
			return err
		}

		return nil
	}
X
Xiangyu Wang 已提交
707
	err = Retry(20, time.Millisecond*200, fn)
Y
yukun 已提交
708
	if err != nil {
X
xige-16 已提交
709 710
		errMsg := "Failed to seek, error = " + err.Error()
		panic(errMsg)
Y
yukun 已提交
711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744
	}
	ms.addConsumer(consumer, seekChannel)

	//TODO: May cause problem
	if len(consumer.Chan()) == 0 {
		return nil
	}

	for {
		select {
		case <-ms.ctx.Done():
			return nil
		case msg, ok := <-consumer.Chan():
			if !ok {
				return errors.New("consumer closed")
			}
			consumer.Ack(msg)

			headerMsg := commonpb.MsgHeader{}
			err := proto.Unmarshal(msg.Payload(), &headerMsg)
			if err != nil {
				log.Error("Failed to unmarshal message header", zap.Error(err))
			}
			tsMsg, err := ms.unmarshal.Unmarshal(msg.Payload(), headerMsg.Base.MsgType)
			if err != nil {
				log.Error("Failed to unmarshal tsMsg", zap.Error(err))
			}
			if tsMsg.Type() == commonpb.MsgType_TimeTick {
				if tsMsg.BeginTs() >= mp.Timestamp {
					return nil
				}
				continue
			}
			if tsMsg.BeginTs() > mp.Timestamp {
X
Xiangyu Wang 已提交
745
				tsMsg.SetPosition(&MsgPosition{
Y
yukun 已提交
746
					ChannelName: filepath.Base(msg.Topic()),
X
xige-16 已提交
747
					MsgID:       msg.ID().Serialize(),
Y
yukun 已提交
748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792
				})
				ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg)
			}
		}
	}
}

//TODO test InMemMsgStream
/*
type InMemMsgStream struct {
	buffer chan *MsgPack
}

func (ms *InMemMsgStream) Start() {}
func (ms *InMemMsgStream) Close() {}

func (ms *InMemMsgStream) ProduceOne(msg TsMsg) error {
	msgPack := MsgPack{}
	msgPack.BeginTs = msg.BeginTs()
	msgPack.EndTs = msg.EndTs()
	msgPack.Msgs = append(msgPack.Msgs, msg)
	buffer <- &msgPack
	return nil
}

func (ms *InMemMsgStream) Produce(msgPack *MsgPack) error {
	buffer <- msgPack
	return nil
}

func (ms *InMemMsgStream) Broadcast(msgPack *MsgPack) error {
	return ms.Produce(msgPack)
}

func (ms *InMemMsgStream) Consume() *MsgPack {
	select {
	case msgPack := <-ms.buffer:
		return msgPack
	}
}

func (ms *InMemMsgStream) Chan() <- chan *MsgPack {
	return buffer
}
*/