mq_msgstream.go 19.3 KB
Newer Older
X
xige-16 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
Xiangyu Wang 已提交
12
package msgstream
Y
yukun 已提交
13 14 15 16 17 18 19 20 21

import (
	"context"
	"errors"
	"path/filepath"
	"sync"
	"time"

	"github.com/golang/protobuf/proto"
X
Xiangyu Wang 已提交
22 23 24 25 26
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/util/mqclient"
	"github.com/milvus-io/milvus/internal/util/trace"
X
Xiangyu Wang 已提交
27
	"github.com/opentracing/opentracing-go"
Y
yukun 已提交
28 29 30
	"go.uber.org/zap"
)

X
Xiangyu Wang 已提交
31
type mqMsgStream struct {
Y
yukun 已提交
32
	ctx              context.Context
X
Xiangyu Wang 已提交
33 34
	client           mqclient.Client
	producers        map[string]mqclient.Producer
Y
yukun 已提交
35
	producerChannels []string
X
Xiangyu Wang 已提交
36
	consumers        map[string]mqclient.Consumer
Y
yukun 已提交
37 38 39 40 41 42 43 44 45 46 47
	consumerChannels []string
	repackFunc       RepackFunc
	unmarshal        UnmarshalDispatcher
	receiveBuf       chan *MsgPack
	wait             *sync.WaitGroup
	streamCancel     func()
	bufSize          int64
	producerLock     *sync.Mutex
	consumerLock     *sync.Mutex
}

X
Xiangyu Wang 已提交
48
func NewMqMsgStream(ctx context.Context,
Y
yukun 已提交
49 50
	receiveBufSize int64,
	bufSize int64,
X
Xiangyu Wang 已提交
51 52
	client mqclient.Client,
	unmarshal UnmarshalDispatcher) (*mqMsgStream, error) {
Y
yukun 已提交
53 54

	streamCtx, streamCancel := context.WithCancel(ctx)
X
Xiangyu Wang 已提交
55 56
	producers := make(map[string]mqclient.Producer)
	consumers := make(map[string]mqclient.Consumer)
Y
yukun 已提交
57 58 59 60
	producerChannels := make([]string, 0)
	consumerChannels := make([]string, 0)
	receiveBuf := make(chan *MsgPack, receiveBufSize)

X
Xiangyu Wang 已提交
61
	stream := &mqMsgStream{
Y
yukun 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
		ctx:              streamCtx,
		client:           client,
		producers:        producers,
		producerChannels: producerChannels,
		consumers:        consumers,
		consumerChannels: consumerChannels,
		unmarshal:        unmarshal,
		bufSize:          bufSize,
		receiveBuf:       receiveBuf,
		streamCancel:     streamCancel,
		producerLock:     &sync.Mutex{},
		consumerLock:     &sync.Mutex{},
		wait:             &sync.WaitGroup{},
	}

	return stream, nil
}

X
Xiangyu Wang 已提交
80
func (ms *mqMsgStream) AsProducer(channels []string) {
Y
yukun 已提交
81 82
	for _, channel := range channels {
		fn := func() error {
X
Xiangyu Wang 已提交
83
			pp, err := ms.client.CreateProducer(mqclient.ProducerOptions{Topic: channel})
Y
yukun 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96
			if err != nil {
				return err
			}
			if pp == nil {
				return errors.New("Producer is nil")
			}

			ms.producerLock.Lock()
			ms.producers[channel] = pp
			ms.producerChannels = append(ms.producerChannels, channel)
			ms.producerLock.Unlock()
			return nil
		}
X
Xiangyu Wang 已提交
97
		err := Retry(20, time.Millisecond*200, fn)
Y
yukun 已提交
98 99 100 101 102 103 104
		if err != nil {
			errMsg := "Failed to create producer " + channel + ", error = " + err.Error()
			panic(errMsg)
		}
	}
}

X
Xiangyu Wang 已提交
105
func (ms *mqMsgStream) AsConsumer(channels []string,
Y
yukun 已提交
106 107 108 109 110 111
	subName string) {
	for _, channel := range channels {
		if _, ok := ms.consumers[channel]; ok {
			continue
		}
		fn := func() error {
X
Xiangyu Wang 已提交
112 113
			receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
			pc, err := ms.client.Subscribe(mqclient.ConsumerOptions{
Y
yukun 已提交
114 115
				Topic:                       channel,
				SubscriptionName:            subName,
X
Xiangyu Wang 已提交
116 117
				Type:                        mqclient.KeyShared,
				SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
Y
yukun 已提交
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
				MessageChannel:              receiveChannel,
			})
			if err != nil {
				return err
			}
			if pc == nil {
				return errors.New("Consumer is nil")
			}

			ms.consumers[channel] = pc
			ms.consumerChannels = append(ms.consumerChannels, channel)
			ms.wait.Add(1)
			go ms.receiveMsg(pc)
			return nil
		}
X
Xiangyu Wang 已提交
133
		err := Retry(20, time.Millisecond*200, fn)
Y
yukun 已提交
134 135 136 137 138 139 140
		if err != nil {
			errMsg := "Failed to create consumer " + channel + ", error = " + err.Error()
			panic(errMsg)
		}
	}
}

X
Xiangyu Wang 已提交
141
func (ms *mqMsgStream) SetRepackFunc(repackFunc RepackFunc) {
Y
yukun 已提交
142 143 144
	ms.repackFunc = repackFunc
}

X
Xiangyu Wang 已提交
145
func (ms *mqMsgStream) Start() {
Y
yukun 已提交
146 147
}

X
Xiangyu Wang 已提交
148
func (ms *mqMsgStream) Close() {
Y
yukun 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
	ms.streamCancel()
	ms.wait.Wait()

	for _, producer := range ms.producers {
		if producer != nil {
			producer.Close()
		}
	}
	for _, consumer := range ms.consumers {
		if consumer != nil {
			consumer.Close()
		}
	}
	if ms.client != nil {
		ms.client.Close()
	}
}

167
func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 {
Y
yukun 已提交
168 169 170 171
	if len(tsMsgs) <= 0 {
		return nil
	}
	reBucketValues := make([][]int32, len(tsMsgs))
172 173 174 175
	channelNum := uint32(len(ms.producerChannels))
	if channelNum == 0 {
		return nil
	}
Y
yukun 已提交
176 177 178 179
	for idx, tsMsg := range tsMsgs {
		hashValues := tsMsg.HashKeys()
		bucketValues := make([]int32, len(hashValues))
		for index, hashValue := range hashValues {
180
			bucketValues[index] = int32(hashValue % channelNum)
Y
yukun 已提交
181 182 183
		}
		reBucketValues[idx] = bucketValues
	}
184 185
	return reBucketValues
}
Y
yukun 已提交
186

187 188 189 190 191 192 193 194 195 196
func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
	tsMsgs := msgPack.Msgs
	if len(tsMsgs) <= 0 {
		log.Debug("Warning: Receive empty msgPack")
		return nil
	}
	if len(ms.producers) <= 0 {
		return errors.New("nil producer in msg stream")
	}
	reBucketValues := ms.ComputeProduceChannelIndexes(msgPack.Msgs)
Y
yukun 已提交
197 198 199 200 201 202 203 204
	var result map[int32]*MsgPack
	var err error
	if ms.repackFunc != nil {
		result, err = ms.repackFunc(tsMsgs, reBucketValues)
	} else {
		msgType := (tsMsgs[0]).Type()
		switch msgType {
		case commonpb.MsgType_Insert:
X
Xiangyu Wang 已提交
205
			result, err = InsertRepackFunc(tsMsgs, reBucketValues)
Y
yukun 已提交
206
		case commonpb.MsgType_Delete:
X
Xiangyu Wang 已提交
207
			result, err = DeleteRepackFunc(tsMsgs, reBucketValues)
Y
yukun 已提交
208
		default:
X
Xiangyu Wang 已提交
209
			result, err = DefaultRepackFunc(tsMsgs, reBucketValues)
Y
yukun 已提交
210 211 212 213 214 215 216 217
		}
	}
	if err != nil {
		return err
	}
	for k, v := range result {
		channel := ms.producerChannels[k]
		for i := 0; i < len(v.Msgs); i++ {
X
Xiangyu Wang 已提交
218
			sp, spanCtx := MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i])
X
Xiangyu Wang 已提交
219

Y
yukun 已提交
220 221 222 223 224
			mb, err := v.Msgs[i].Marshal(v.Msgs[i])
			if err != nil {
				return err
			}

X
Xiangyu Wang 已提交
225
			m, err := ConvertToByteArray(mb)
Y
yukun 已提交
226 227 228 229
			if err != nil {
				return err
			}

X
Xiangyu Wang 已提交
230
			msg := &mqclient.ProducerMessage{Payload: m, Properties: map[string]string{}}
Y
yukun 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247

			trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)

			if err := ms.producers[channel].Send(
				spanCtx,
				msg,
			); err != nil {
				trace.LogError(sp, err)
				sp.Finish()
				return err
			}
			sp.Finish()
		}
	}
	return nil
}

X
Xiangyu Wang 已提交
248
func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error {
Y
yukun 已提交
249
	for _, v := range msgPack.Msgs {
X
Xiangyu Wang 已提交
250
		sp, spanCtx := MsgSpanFromCtx(v.TraceCtx(), v)
X
Xiangyu Wang 已提交
251

Y
yukun 已提交
252 253 254 255 256
		mb, err := v.Marshal(v)
		if err != nil {
			return err
		}

X
Xiangyu Wang 已提交
257
		m, err := ConvertToByteArray(mb)
Y
yukun 已提交
258 259 260 261
		if err != nil {
			return err
		}

X
Xiangyu Wang 已提交
262
		msg := &mqclient.ProducerMessage{Payload: m, Properties: map[string]string{}}
Y
yukun 已提交
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282

		trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)

		ms.producerLock.Lock()
		for _, producer := range ms.producers {
			if err := producer.Send(
				spanCtx,
				msg,
			); err != nil {
				trace.LogError(sp, err)
				sp.Finish()
				return err
			}
		}
		ms.producerLock.Unlock()
		sp.Finish()
	}
	return nil
}

X
Xiangyu Wang 已提交
283
func (ms *mqMsgStream) Consume() *MsgPack {
Y
yukun 已提交
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
	for {
		select {
		case cm, ok := <-ms.receiveBuf:
			if !ok {
				log.Debug("buf chan closed")
				return nil
			}
			return cm
		case <-ms.ctx.Done():
			//log.Debug("context closed")
			return nil
		}
	}
}

X
Xiangyu Wang 已提交
299
func (ms *mqMsgStream) receiveMsg(consumer mqclient.Consumer) {
Y
yukun 已提交
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
	defer ms.wait.Done()

	for {
		select {
		case <-ms.ctx.Done():
			return
		case msg, ok := <-consumer.Chan():
			if !ok {
				return
			}
			consumer.Ack(msg)
			headerMsg := commonpb.MsgHeader{}
			err := proto.Unmarshal(msg.Payload(), &headerMsg)
			if err != nil {
				log.Error("Failed to unmarshal message header", zap.Error(err))
				continue
			}
			tsMsg, err := ms.unmarshal.Unmarshal(msg.Payload(), headerMsg.Base.MsgType)
			if err != nil {
				log.Error("Failed to unmarshal tsMsg", zap.Error(err))
				continue
			}

X
Xiangyu Wang 已提交
323
			sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
X
Xiangyu Wang 已提交
324 325 326 327
			if ok {
				tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
			}

X
Xiangyu Wang 已提交
328
			tsMsg.SetPosition(&MsgPosition{
Y
yukun 已提交
329 330
				ChannelName: filepath.Base(msg.Topic()),
				//FIXME
X
xige-16 已提交
331
				MsgID: msg.ID().Serialize(),
Y
yukun 已提交
332 333 334 335 336 337 338 339 340 341
			})

			msgPack := MsgPack{Msgs: []TsMsg{tsMsg}}
			ms.receiveBuf <- &msgPack

			sp.Finish()
		}
	}
}

X
Xiangyu Wang 已提交
342
func (ms *mqMsgStream) Chan() <-chan *MsgPack {
Y
yukun 已提交
343 344 345
	return ms.receiveBuf
}

X
Xiangyu Wang 已提交
346
func (ms *mqMsgStream) Seek(mp *internalpb.MsgPosition) error {
Y
yukun 已提交
347 348
	if _, ok := ms.consumers[mp.ChannelName]; ok {
		consumer := ms.consumers[mp.ChannelName]
X
xige-16 已提交
349
		messageID, err := ms.client.BytesToMsgID(mp.MsgID)
Y
yukun 已提交
350 351 352 353 354 355 356 357 358 359 360 361 362
		if err != nil {
			return err
		}
		err = consumer.Seek(messageID)
		if err != nil {
			return err
		}
		return nil
	}

	return errors.New("msgStream seek fail")
}

X
Xiangyu Wang 已提交
363 364 365 366
type MqTtMsgStream struct {
	mqMsgStream
	unsolvedBuf     map[mqclient.Consumer][]TsMsg
	msgPositions    map[mqclient.Consumer]*internalpb.MsgPosition
Y
yukun 已提交
367 368 369
	unsolvedMutex   *sync.Mutex
	lastTimeStamp   Timestamp
	syncConsumer    chan int
X
Xiangyu Wang 已提交
370
	stopConsumeChan map[mqclient.Consumer]chan bool
Y
yukun 已提交
371 372
}

X
Xiangyu Wang 已提交
373
func NewMqTtMsgStream(ctx context.Context,
Y
yukun 已提交
374 375
	receiveBufSize int64,
	bufSize int64,
X
Xiangyu Wang 已提交
376 377 378
	client mqclient.Client,
	unmarshal UnmarshalDispatcher) (*MqTtMsgStream, error) {
	msgStream, err := NewMqMsgStream(ctx, receiveBufSize, bufSize, client, unmarshal)
Y
yukun 已提交
379 380 381
	if err != nil {
		return nil, err
	}
X
Xiangyu Wang 已提交
382 383 384
	unsolvedBuf := make(map[mqclient.Consumer][]TsMsg)
	stopChannel := make(map[mqclient.Consumer]chan bool)
	msgPositions := make(map[mqclient.Consumer]*internalpb.MsgPosition)
Y
yukun 已提交
385 386
	syncConsumer := make(chan int, 1)

X
Xiangyu Wang 已提交
387 388
	return &MqTtMsgStream{
		mqMsgStream:     *msgStream,
Y
yukun 已提交
389 390 391 392 393 394 395 396
		unsolvedBuf:     unsolvedBuf,
		msgPositions:    msgPositions,
		unsolvedMutex:   &sync.Mutex{},
		syncConsumer:    syncConsumer,
		stopConsumeChan: stopChannel,
	}, nil
}

X
Xiangyu Wang 已提交
397
func (ms *MqTtMsgStream) addConsumer(consumer mqclient.Consumer, channel string) {
Y
yukun 已提交
398 399 400 401 402 403 404 405
	if len(ms.consumers) == 0 {
		ms.syncConsumer <- 1
	}
	ms.consumers[channel] = consumer
	ms.unsolvedBuf[consumer] = make([]TsMsg, 0)
	ms.consumerChannels = append(ms.consumerChannels, channel)
	ms.msgPositions[consumer] = &internalpb.MsgPosition{
		ChannelName: channel,
X
xige-16 已提交
406
		MsgID:       make([]byte, 0),
Y
yukun 已提交
407 408 409 410 411 412
		Timestamp:   ms.lastTimeStamp,
	}
	stopConsumeChan := make(chan bool)
	ms.stopConsumeChan[consumer] = stopConsumeChan
}

X
Xiangyu Wang 已提交
413
func (ms *MqTtMsgStream) AsConsumer(channels []string,
Y
yukun 已提交
414 415 416 417 418 419
	subName string) {
	for _, channel := range channels {
		if _, ok := ms.consumers[channel]; ok {
			continue
		}
		fn := func() error {
X
Xiangyu Wang 已提交
420 421
			receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
			pc, err := ms.client.Subscribe(mqclient.ConsumerOptions{
Y
yukun 已提交
422 423
				Topic:                       channel,
				SubscriptionName:            subName,
X
Xiangyu Wang 已提交
424 425
				Type:                        mqclient.KeyShared,
				SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
Y
yukun 已提交
426 427 428 429 430 431 432 433 434 435 436 437 438 439
				MessageChannel:              receiveChannel,
			})
			if err != nil {
				return err
			}
			if pc == nil {
				return errors.New("Consumer is nil")
			}

			ms.consumerLock.Lock()
			ms.addConsumer(pc, channel)
			ms.consumerLock.Unlock()
			return nil
		}
X
Xiangyu Wang 已提交
440
		err := Retry(10, time.Millisecond*200, fn)
Y
yukun 已提交
441 442 443 444 445 446 447
		if err != nil {
			errMsg := "Failed to create consumer " + channel + ", error = " + err.Error()
			panic(errMsg)
		}
	}
}

X
Xiangyu Wang 已提交
448
func (ms *MqTtMsgStream) Start() {
Y
yukun 已提交
449 450 451 452 453 454
	if ms.consumers != nil {
		ms.wait.Add(1)
		go ms.bufMsgPackToChannel()
	}
}

X
Xiangyu Wang 已提交
455
func (ms *MqTtMsgStream) Close() {
Y
yukun 已提交
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
	ms.streamCancel()
	close(ms.syncConsumer)
	ms.wait.Wait()

	for _, producer := range ms.producers {
		if producer != nil {
			producer.Close()
		}
	}
	for _, consumer := range ms.consumers {
		if consumer != nil {
			consumer.Close()
		}
	}
	if ms.client != nil {
		ms.client.Close()
	}
}

X
Xiangyu Wang 已提交
475
func (ms *MqTtMsgStream) bufMsgPackToChannel() {
Y
yukun 已提交
476
	defer ms.wait.Done()
X
Xiangyu Wang 已提交
477 478 479
	ms.unsolvedBuf = make(map[mqclient.Consumer][]TsMsg)
	isChannelReady := make(map[mqclient.Consumer]bool)
	eofMsgTimeStamp := make(map[mqclient.Consumer]Timestamp)
Y
yukun 已提交
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565

	if _, ok := <-ms.syncConsumer; !ok {
		log.Debug("consumer closed!")
		return
	}

	for {
		select {
		case <-ms.ctx.Done():
			return
		default:
			wg := sync.WaitGroup{}
			findMapMutex := sync.RWMutex{}
			ms.consumerLock.Lock()
			for _, consumer := range ms.consumers {
				if isChannelReady[consumer] {
					continue
				}
				wg.Add(1)
				go ms.findTimeTick(consumer, eofMsgTimeStamp, &wg, &findMapMutex)
			}
			ms.consumerLock.Unlock()
			wg.Wait()
			timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp, isChannelReady, &findMapMutex)
			if !ok || timeStamp <= ms.lastTimeStamp {
				//log.Printf("All timeTick's timestamps are inconsistent")
				continue
			}
			timeTickBuf := make([]TsMsg, 0)
			startMsgPosition := make([]*internalpb.MsgPosition, 0)
			endMsgPositions := make([]*internalpb.MsgPosition, 0)
			ms.unsolvedMutex.Lock()
			for consumer, msgs := range ms.unsolvedBuf {
				if len(msgs) == 0 {
					continue
				}
				tempBuffer := make([]TsMsg, 0)
				var timeTickMsg TsMsg
				for _, v := range msgs {
					if v.Type() == commonpb.MsgType_TimeTick {
						timeTickMsg = v
						continue
					}
					if v.EndTs() <= timeStamp {
						timeTickBuf = append(timeTickBuf, v)
					} else {
						tempBuffer = append(tempBuffer, v)
					}
				}
				ms.unsolvedBuf[consumer] = tempBuffer

				startMsgPosition = append(startMsgPosition, ms.msgPositions[consumer])
				var newPos *internalpb.MsgPosition
				if len(tempBuffer) > 0 {
					newPos = &internalpb.MsgPosition{
						ChannelName: tempBuffer[0].Position().ChannelName,
						MsgID:       tempBuffer[0].Position().MsgID,
						Timestamp:   timeStamp,
					}
					endMsgPositions = append(endMsgPositions, newPos)
				} else {
					newPos = &internalpb.MsgPosition{
						ChannelName: timeTickMsg.Position().ChannelName,
						MsgID:       timeTickMsg.Position().MsgID,
						Timestamp:   timeStamp,
					}
					endMsgPositions = append(endMsgPositions, newPos)
				}
				ms.msgPositions[consumer] = newPos
			}
			ms.unsolvedMutex.Unlock()

			msgPack := MsgPack{
				BeginTs:        ms.lastTimeStamp,
				EndTs:          timeStamp,
				Msgs:           timeTickBuf,
				StartPositions: startMsgPosition,
				EndPositions:   endMsgPositions,
			}

			ms.receiveBuf <- &msgPack
			ms.lastTimeStamp = timeStamp
		}
	}
}

X
Xiangyu Wang 已提交
566 567
func (ms *MqTtMsgStream) findTimeTick(consumer mqclient.Consumer,
	eofMsgMap map[mqclient.Consumer]Timestamp,
Y
yukun 已提交
568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594
	wg *sync.WaitGroup,
	findMapMutex *sync.RWMutex) {
	defer wg.Done()
	for {
		select {
		case <-ms.ctx.Done():
			return
		case msg, ok := <-consumer.Chan():
			if !ok {
				log.Debug("consumer closed!")
				return
			}
			consumer.Ack(msg)

			headerMsg := commonpb.MsgHeader{}
			err := proto.Unmarshal(msg.Payload(), &headerMsg)
			if err != nil {
				log.Error("Failed to unmarshal message header", zap.Error(err))
				continue
			}
			tsMsg, err := ms.unmarshal.Unmarshal(msg.Payload(), headerMsg.Base.MsgType)
			if err != nil {
				log.Error("Failed to unmarshal tsMsg", zap.Error(err))
				continue
			}

			// set msg info to tsMsg
X
Xiangyu Wang 已提交
595
			tsMsg.SetPosition(&MsgPosition{
Y
yukun 已提交
596
				ChannelName: filepath.Base(msg.Topic()),
X
xige-16 已提交
597
				MsgID:       msg.ID().Serialize(),
Y
yukun 已提交
598 599
			})

X
Xiangyu Wang 已提交
600
			sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties())
Y
yukun 已提交
601
			if ok {
X
Xiangyu Wang 已提交
602
				tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp))
Y
yukun 已提交
603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622
			}

			ms.unsolvedMutex.Lock()
			ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg)
			ms.unsolvedMutex.Unlock()

			if headerMsg.Base.MsgType == commonpb.MsgType_TimeTick {
				findMapMutex.Lock()
				eofMsgMap[consumer] = tsMsg.(*TimeTickMsg).Base.Timestamp
				findMapMutex.Unlock()
				sp.Finish()
				return
			}
			sp.Finish()
		case <-ms.stopConsumeChan[consumer]:
			return
		}
	}
}

X
Xiangyu Wang 已提交
623 624
func checkTimeTickMsg(msg map[mqclient.Consumer]Timestamp,
	isChannelReady map[mqclient.Consumer]bool,
Y
yukun 已提交
625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653
	mu *sync.RWMutex) (Timestamp, bool) {
	checkMap := make(map[Timestamp]int)
	var maxTime Timestamp = 0
	for _, v := range msg {
		checkMap[v]++
		if v > maxTime {
			maxTime = v
		}
	}
	if len(checkMap) <= 1 {
		for consumer := range msg {
			isChannelReady[consumer] = false
		}
		return maxTime, true
	}
	for consumer := range msg {
		mu.RLock()
		v := msg[consumer]
		mu.RUnlock()
		if v != maxTime {
			isChannelReady[consumer] = false
		} else {
			isChannelReady[consumer] = true
		}
	}

	return 0, false
}

X
Xiangyu Wang 已提交
654
func (ms *MqTtMsgStream) Seek(mp *internalpb.MsgPosition) error {
Y
yukun 已提交
655 656 657
	if len(mp.MsgID) == 0 {
		return errors.New("when msgID's length equal to 0, please use AsConsumer interface")
	}
X
Xiangyu Wang 已提交
658
	var consumer mqclient.Consumer
Y
yukun 已提交
659 660 661 662 663 664 665 666 667
	var err error
	var hasWatched bool
	seekChannel := mp.ChannelName
	subName := mp.MsgGroup
	ms.consumerLock.Lock()
	defer ms.consumerLock.Unlock()
	consumer, hasWatched = ms.consumers[seekChannel]

	if hasWatched {
668
		return errors.New("the channel should has not been subscribed")
Y
yukun 已提交
669 670
	}

X
xige-16 已提交
671
	fn := func() error {
X
Xiangyu Wang 已提交
672 673
		receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize)
		consumer, err = ms.client.Subscribe(mqclient.ConsumerOptions{
X
xige-16 已提交
674 675
			Topic:                       seekChannel,
			SubscriptionName:            subName,
X
Xiangyu Wang 已提交
676 677
			SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
			Type:                        mqclient.KeyShared,
X
xige-16 已提交
678 679 680 681 682 683 684 685 686 687
			MessageChannel:              receiveChannel,
		})
		if err != nil {
			return err
		}
		if consumer == nil {
			err = errors.New("consumer is nil")
			log.Debug("subscribe error", zap.String("error = ", err.Error()))
			return err
		}
Y
yukun 已提交
688

X
xige-16 已提交
689 690 691 692 693 694 695 696 697 698 699 700 701
		seekMsgID, err := ms.client.BytesToMsgID(mp.MsgID)
		if err != nil {
			log.Debug("convert messageID error", zap.String("error = ", err.Error()))
			return err
		}
		err = consumer.Seek(seekMsgID)
		if err != nil {
			log.Debug("seek error ", zap.String("error = ", err.Error()))
			return err
		}

		return nil
	}
X
Xiangyu Wang 已提交
702
	err = Retry(20, time.Millisecond*200, fn)
Y
yukun 已提交
703
	if err != nil {
X
xige-16 已提交
704 705
		errMsg := "Failed to seek, error = " + err.Error()
		panic(errMsg)
Y
yukun 已提交
706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739
	}
	ms.addConsumer(consumer, seekChannel)

	//TODO: May cause problem
	if len(consumer.Chan()) == 0 {
		return nil
	}

	for {
		select {
		case <-ms.ctx.Done():
			return nil
		case msg, ok := <-consumer.Chan():
			if !ok {
				return errors.New("consumer closed")
			}
			consumer.Ack(msg)

			headerMsg := commonpb.MsgHeader{}
			err := proto.Unmarshal(msg.Payload(), &headerMsg)
			if err != nil {
				log.Error("Failed to unmarshal message header", zap.Error(err))
			}
			tsMsg, err := ms.unmarshal.Unmarshal(msg.Payload(), headerMsg.Base.MsgType)
			if err != nil {
				log.Error("Failed to unmarshal tsMsg", zap.Error(err))
			}
			if tsMsg.Type() == commonpb.MsgType_TimeTick {
				if tsMsg.BeginTs() >= mp.Timestamp {
					return nil
				}
				continue
			}
			if tsMsg.BeginTs() > mp.Timestamp {
X
Xiangyu Wang 已提交
740
				tsMsg.SetPosition(&MsgPosition{
Y
yukun 已提交
741
					ChannelName: filepath.Base(msg.Topic()),
X
xige-16 已提交
742
					MsgID:       msg.ID().Serialize(),
Y
yukun 已提交
743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787
				})
				ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg)
			}
		}
	}
}

//TODO test InMemMsgStream
/*
type InMemMsgStream struct {
	buffer chan *MsgPack
}

func (ms *InMemMsgStream) Start() {}
func (ms *InMemMsgStream) Close() {}

func (ms *InMemMsgStream) ProduceOne(msg TsMsg) error {
	msgPack := MsgPack{}
	msgPack.BeginTs = msg.BeginTs()
	msgPack.EndTs = msg.EndTs()
	msgPack.Msgs = append(msgPack.Msgs, msg)
	buffer <- &msgPack
	return nil
}

func (ms *InMemMsgStream) Produce(msgPack *MsgPack) error {
	buffer <- msgPack
	return nil
}

func (ms *InMemMsgStream) Broadcast(msgPack *MsgPack) error {
	return ms.Produce(msgPack)
}

func (ms *InMemMsgStream) Consume() *MsgPack {
	select {
	case msgPack := <-ms.buffer:
		return msgPack
	}
}

func (ms *InMemMsgStream) Chan() <- chan *MsgPack {
	return buffer
}
*/