pulsar_msgstream_test.go 19.4 KB
Newer Older
X
Xiangyu Wang 已提交
1
package pulsarms
Z
zhenshan.cao 已提交
2 3

import (
4
	"context"
Z
zhenshan.cao 已提交
5
	"fmt"
B
bigsheeper 已提交
6
	"log"
C
cai.zhang 已提交
7
	"os"
Z
zhenshan.cao 已提交
8 9
	"testing"

10 11
	"github.com/stretchr/testify/assert"

X
Xiangyu Wang 已提交
12
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
13
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
14
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
C
cai.zhang 已提交
15
	"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
Z
zhenshan.cao 已提交
16 17
)

C
cai.zhang 已提交
18 19 20 21 22 23 24 25
var Params paramtable.BaseTable

func TestMain(m *testing.M) {
	Params.Init()
	exitCode := m.Run()
	os.Exit(exitCode)
}

26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
func repackFunc(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
	result := make(map[int32]*MsgPack)
	for i, request := range msgs {
		keys := hashKeys[i]
		for _, channelID := range keys {
			_, ok := result[channelID]
			if ok == false {
				msgPack := MsgPack{}
				result[channelID] = &msgPack
			}
			result[channelID].Msgs = append(result[channelID].Msgs, request)
		}
	}
	return result, nil
}

func getTsMsg(msgType MsgType, reqID UniqueID, hashValue uint32) TsMsg {
	baseMsg := BaseMsg{
		BeginTimestamp: 0,
		EndTimestamp:   0,
		HashValues:     []uint32{hashValue},
	}
	switch msgType {
	case commonpb.MsgType_kInsert:
		insertRequest := internalpb2.InsertRequest{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kInsert,
				MsgID:     reqID,
				Timestamp: 11,
				SourceID:  reqID,
			},
			CollectionName: "Collection",
			PartitionName:  "Partition",
			SegmentID:      1,
			ChannelID:      "0",
			Timestamps:     []Timestamp{uint64(reqID)},
			RowIDs:         []int64{1},
			RowData:        []*commonpb.Blob{{}},
		}
		insertMsg := &msgstream.InsertMsg{
			BaseMsg:       baseMsg,
			InsertRequest: insertRequest,
		}
		return insertMsg
	case commonpb.MsgType_kDelete:
		deleteRequest := internalpb2.DeleteRequest{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kDelete,
				MsgID:     reqID,
				Timestamp: 11,
				SourceID:  reqID,
			},
			CollectionName: "Collection",
			ChannelID:      "1",
			Timestamps:     []Timestamp{1},
			PrimaryKeys:    []IntPrimaryKey{1},
		}
		deleteMsg := &msgstream.DeleteMsg{
			BaseMsg:       baseMsg,
			DeleteRequest: deleteRequest,
		}
		return deleteMsg
	case commonpb.MsgType_kSearch:
		searchRequest := internalpb2.SearchRequest{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kSearch,
				MsgID:     reqID,
				Timestamp: 11,
				SourceID:  reqID,
			},
			Query:           nil,
			ResultChannelID: "0",
		}
		searchMsg := &msgstream.SearchMsg{
			BaseMsg:       baseMsg,
			SearchRequest: searchRequest,
		}
		return searchMsg
	case commonpb.MsgType_kSearchResult:
		searchResult := internalpb2.SearchResults{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kSearchResult,
				MsgID:     reqID,
				Timestamp: 1,
				SourceID:  reqID,
			},
			Status:          &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS},
			ResultChannelID: "0",
		}
		searchResultMsg := &msgstream.SearchResultMsg{
			BaseMsg:       baseMsg,
			SearchResults: searchResult,
		}
		return searchResultMsg
	case commonpb.MsgType_kTimeTick:
		timeTickResult := internalpb2.TimeTickMsg{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kTimeTick,
				MsgID:     reqID,
				Timestamp: 1,
				SourceID:  reqID,
			},
		}
		timeTickMsg := &TimeTickMsg{
			BaseMsg:     baseMsg,
			TimeTickMsg: timeTickResult,
		}
		return timeTickMsg
	case commonpb.MsgType_kQueryNodeStats:
		queryNodeSegStats := internalpb2.QueryNodeStats{
			Base: &commonpb.MsgBase{
				MsgType:  commonpb.MsgType_kQueryNodeStats,
				SourceID: reqID,
			},
		}
		queryNodeSegStatsMsg := &QueryNodeStatsMsg{
			BaseMsg:        baseMsg,
			QueryNodeStats: queryNodeSegStats,
		}
		return queryNodeSegStatsMsg
	}
	return nil
}

func getTimeTickMsg(reqID UniqueID, hashValue uint32, time uint64) TsMsg {
	baseMsg := BaseMsg{
		BeginTimestamp: 0,
		EndTimestamp:   0,
		HashValues:     []uint32{hashValue},
	}
	timeTickResult := internalpb2.TimeTickMsg{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_kTimeTick,
			MsgID:     reqID,
			Timestamp: time,
			SourceID:  reqID,
		},
	}
	timeTickMsg := &TimeTickMsg{
		BaseMsg:     baseMsg,
		TimeTickMsg: timeTickResult,
	}
	return timeTickMsg
}

171
func initPulsarStream(pulsarAddress string,
Z
zhenshan.cao 已提交
172 173 174
	producerChannels []string,
	consumerChannels []string,
	consumerSubName string,
175
	opts ...RepackFunc) (msgstream.MsgStream, msgstream.MsgStream) {
176
	factory := msgstream.ProtoUDFactory{}
Z
zhenshan.cao 已提交
177 178

	// set input stream
Z
zhenshan.cao 已提交
179 180
	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	inputStream.AsProducer(producerChannels)
181 182 183
	for _, opt := range opts {
		inputStream.SetRepackFunc(opt)
	}
184
	inputStream.Start()
X
Xiangyu Wang 已提交
185
	var input msgstream.MsgStream = inputStream
Z
zhenshan.cao 已提交
186 187

	// set output stream
Z
zhenshan.cao 已提交
188 189
	outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	outputStream.AsConsumer(consumerChannels, consumerSubName)
Z
zhenshan.cao 已提交
190
	outputStream.Start()
X
Xiangyu Wang 已提交
191
	var output msgstream.MsgStream = outputStream
Z
zhenshan.cao 已提交
192

193
	return input, output
194 195 196 197 198 199
}

func initPulsarTtStream(pulsarAddress string,
	producerChannels []string,
	consumerChannels []string,
	consumerSubName string,
200
	opts ...RepackFunc) (msgstream.MsgStream, msgstream.MsgStream) {
201
	factory := msgstream.ProtoUDFactory{}
202 203

	// set input stream
Z
zhenshan.cao 已提交
204 205
	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	inputStream.AsProducer(producerChannels)
206 207
	for _, opt := range opts {
		inputStream.SetRepackFunc(opt)
N
neza2017 已提交
208
	}
Z
zhenshan.cao 已提交
209
	inputStream.Start()
X
Xiangyu Wang 已提交
210
	var input msgstream.MsgStream = inputStream
211 212

	// set output stream
Z
zhenshan.cao 已提交
213 214
	outputStream, _ := NewPulsarTtMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	outputStream.AsConsumer(consumerChannels, consumerSubName)
215
	outputStream.Start()
X
Xiangyu Wang 已提交
216
	var output msgstream.MsgStream = outputStream
217

218
	return input, output
219
}
N
neza2017 已提交
220

221
func receiveMsg(outputStream msgstream.MsgStream, msgCount int) {
Z
zhenshan.cao 已提交
222 223
	receiveCount := 0
	for {
224
		result := outputStream.Consume()
Z
zhenshan.cao 已提交
225 226 227 228
		if len(result.Msgs) > 0 {
			msgs := result.Msgs
			for _, v := range msgs {
				receiveCount++
X
xige-16 已提交
229
				fmt.Println("msg type: ", v.Type(), ", msg value: ", v)
Z
zhenshan.cao 已提交
230 231
			}
		}
232 233
		if receiveCount >= msgCount {
			break
Z
zhenshan.cao 已提交
234 235 236 237
		}
	}
}

238
func TestStream_PulsarMsgStream_Insert(t *testing.T) {
C
cai.zhang 已提交
239
	pulsarAddress, _ := Params.Load("_PulsarAddress")
Z
zhenshan.cao 已提交
240 241 242 243
	producerChannels := []string{"insert1", "insert2"}
	consumerChannels := []string{"insert1", "insert2"}
	consumerSubName := "subInsert"

X
Xiangyu Wang 已提交
244
	msgPack := msgstream.MsgPack{}
245 246
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
Z
zhenshan.cao 已提交
247

248
	inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
249
	err := inputStream.Produce(&msgPack)
B
bigsheeper 已提交
250 251 252 253
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}

254
	receiveMsg(outputStream, len(msgPack.Msgs))
255 256
	inputStream.Close()
	outputStream.Close()
257

Z
zhenshan.cao 已提交
258 259
}

260
func TestStream_PulsarMsgStream_Delete(t *testing.T) {
C
cai.zhang 已提交
261
	pulsarAddress, _ := Params.Load("_PulsarAddress")
Z
zhenshan.cao 已提交
262 263 264 265
	producerChannels := []string{"delete"}
	consumerChannels := []string{"delete"}
	consumerSubName := "subDelete"

X
Xiangyu Wang 已提交
266
	msgPack := msgstream.MsgPack{}
267
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kDelete, 1, 1))
268
	//msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kDelete, 3, 3))
Z
zhenshan.cao 已提交
269

270
	inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
271
	err := inputStream.Produce(&msgPack)
B
bigsheeper 已提交
272 273 274
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
275
	receiveMsg(outputStream, len(msgPack.Msgs))
276 277
	inputStream.Close()
	outputStream.Close()
Z
zhenshan.cao 已提交
278 279
}

280
func TestStream_PulsarMsgStream_Search(t *testing.T) {
C
cai.zhang 已提交
281
	pulsarAddress, _ := Params.Load("_PulsarAddress")
Z
zhenshan.cao 已提交
282 283 284 285
	producerChannels := []string{"search"}
	consumerChannels := []string{"search"}
	consumerSubName := "subSearch"

X
Xiangyu Wang 已提交
286
	msgPack := msgstream.MsgPack{}
287 288
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearch, 1, 1))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearch, 3, 3))
Z
zhenshan.cao 已提交
289

290
	inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
291
	err := inputStream.Produce(&msgPack)
B
bigsheeper 已提交
292 293 294
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
295
	receiveMsg(outputStream, len(msgPack.Msgs))
296 297
	inputStream.Close()
	outputStream.Close()
Z
zhenshan.cao 已提交
298 299
}

300
func TestStream_PulsarMsgStream_SearchResult(t *testing.T) {
C
cai.zhang 已提交
301
	pulsarAddress, _ := Params.Load("_PulsarAddress")
302 303 304
	producerChannels := []string{"searchResult"}
	consumerChannels := []string{"searchResult"}
	consumerSubName := "subSearchResult"
Z
zhenshan.cao 已提交
305

X
Xiangyu Wang 已提交
306
	msgPack := msgstream.MsgPack{}
307 308
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 1, 1))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 3, 3))
Z
zhenshan.cao 已提交
309

310
	inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
311
	err := inputStream.Produce(&msgPack)
B
bigsheeper 已提交
312 313 314
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
315
	receiveMsg(outputStream, len(msgPack.Msgs))
316 317
	inputStream.Close()
	outputStream.Close()
Z
zhenshan.cao 已提交
318 319
}

320
func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
C
cai.zhang 已提交
321
	pulsarAddress, _ := Params.Load("_PulsarAddress")
322 323 324
	producerChannels := []string{"timeTick"}
	consumerChannels := []string{"timeTick"}
	consumerSubName := "subTimeTick"
Z
zhenshan.cao 已提交
325

X
Xiangyu Wang 已提交
326
	msgPack := msgstream.MsgPack{}
327 328
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 3, 3))
Z
zhenshan.cao 已提交
329

330
	inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
331
	err := inputStream.Produce(&msgPack)
B
bigsheeper 已提交
332 333 334
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
335
	receiveMsg(outputStream, len(msgPack.Msgs))
336 337
	inputStream.Close()
	outputStream.Close()
Z
zhenshan.cao 已提交
338
}
C
cai.zhang 已提交
339

340
func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
C
cai.zhang 已提交
341
	pulsarAddress, _ := Params.Load("_PulsarAddress")
N
neza2017 已提交
342
	producerChannels := []string{"insert1", "insert2"}
343
	consumerChannels := []string{"insert1", "insert2"}
C
cai.zhang 已提交
344 345
	consumerSubName := "subInsert"

X
Xiangyu Wang 已提交
346
	msgPack := msgstream.MsgPack{}
347 348
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 3, 3))
C
cai.zhang 已提交
349

350
	inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
351
	err := inputStream.Broadcast(&msgPack)
B
bigsheeper 已提交
352 353 354
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
355
	receiveMsg(outputStream, len(consumerChannels)*len(msgPack.Msgs))
356 357
	inputStream.Close()
	outputStream.Close()
358 359 360
}

func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) {
C
cai.zhang 已提交
361
	pulsarAddress, _ := Params.Load("_PulsarAddress")
362 363 364 365
	producerChannels := []string{"insert1", "insert2"}
	consumerChannels := []string{"insert1", "insert2"}
	consumerSubName := "subInsert"

X
Xiangyu Wang 已提交
366
	msgPack := msgstream.MsgPack{}
367 368
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
369

370 371
	inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, repackFunc)
	err := inputStream.Produce(&msgPack)
B
bigsheeper 已提交
372 373 374
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
375
	receiveMsg(outputStream, len(msgPack.Msgs))
376 377
	inputStream.Close()
	outputStream.Close()
378 379
}

B
bigsheeper 已提交
380
func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
C
cai.zhang 已提交
381
	pulsarAddress, _ := Params.Load("_PulsarAddress")
B
bigsheeper 已提交
382 383 384 385
	producerChannels := []string{"insert1", "insert2"}
	consumerChannels := []string{"insert1", "insert2"}
	consumerSubName := "subInsert"

X
Xiangyu Wang 已提交
386
	baseMsg := msgstream.BaseMsg{
B
bigsheeper 已提交
387 388
		BeginTimestamp: 0,
		EndTimestamp:   0,
N
neza2017 已提交
389
		HashValues:     []uint32{1, 3},
B
bigsheeper 已提交
390 391
	}

392 393 394 395 396 397 398
	insertRequest := internalpb2.InsertRequest{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_kInsert,
			MsgID:     1,
			Timestamp: 1,
			SourceID:  1,
		},
B
bigsheeper 已提交
399
		CollectionName: "Collection",
400
		PartitionName:  "Partition",
401
		SegmentID:      1,
402
		ChannelID:      "1",
X
Xiangyu Wang 已提交
403
		Timestamps:     []msgstream.Timestamp{1, 1},
404
		RowIDs:         []int64{1, 3},
405
		RowData:        []*commonpb.Blob{{}, {}},
B
bigsheeper 已提交
406
	}
X
Xiangyu Wang 已提交
407
	insertMsg := &msgstream.InsertMsg{
B
bigsheeper 已提交
408 409 410
		BaseMsg:       baseMsg,
		InsertRequest: insertRequest,
	}
X
xige-16 已提交
411

X
Xiangyu Wang 已提交
412
	msgPack := msgstream.MsgPack{}
X
xige-16 已提交
413
	msgPack.Msgs = append(msgPack.Msgs, insertMsg)
B
bigsheeper 已提交
414

415
	factory := msgstream.ProtoUDFactory{}
Z
zhenshan.cao 已提交
416 417
	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	inputStream.AsProducer(producerChannels)
B
bigsheeper 已提交
418 419
	inputStream.Start()

Z
zhenshan.cao 已提交
420 421
	outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	outputStream.AsConsumer(consumerChannels, consumerSubName)
B
bigsheeper 已提交
422
	outputStream.Start()
X
Xiangyu Wang 已提交
423
	var output msgstream.MsgStream = outputStream
B
bigsheeper 已提交
424 425 426 427 428

	err := (*inputStream).Produce(&msgPack)
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
429
	receiveMsg(output, len(msgPack.Msgs)*2)
B
bigsheeper 已提交
430 431 432 433 434
	(*inputStream).Close()
	(*outputStream).Close()
}

func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
C
cai.zhang 已提交
435
	pulsarAddress, _ := Params.Load("_PulsarAddress")
B
bigsheeper 已提交
436 437 438 439
	producerChannels := []string{"insert1", "insert2"}
	consumerChannels := []string{"insert1", "insert2"}
	consumerSubName := "subInsert"

X
Xiangyu Wang 已提交
440
	baseMsg := msgstream.BaseMsg{
B
bigsheeper 已提交
441 442
		BeginTimestamp: 0,
		EndTimestamp:   0,
N
neza2017 已提交
443
		HashValues:     []uint32{1, 3},
B
bigsheeper 已提交
444 445
	}

446 447 448 449 450 451 452
	deleteRequest := internalpb2.DeleteRequest{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_kDelete,
			MsgID:     1,
			Timestamp: 1,
			SourceID:  1,
		},
B
bigsheeper 已提交
453
		CollectionName: "Collection",
454
		ChannelID:      "1",
X
Xiangyu Wang 已提交
455
		Timestamps:     []msgstream.Timestamp{1, 1},
B
bigsheeper 已提交
456 457
		PrimaryKeys:    []int64{1, 3},
	}
X
Xiangyu Wang 已提交
458
	deleteMsg := &msgstream.DeleteMsg{
B
bigsheeper 已提交
459 460 461
		BaseMsg:       baseMsg,
		DeleteRequest: deleteRequest,
	}
X
xige-16 已提交
462

X
Xiangyu Wang 已提交
463
	msgPack := msgstream.MsgPack{}
X
xige-16 已提交
464
	msgPack.Msgs = append(msgPack.Msgs, deleteMsg)
B
bigsheeper 已提交
465

466
	factory := msgstream.ProtoUDFactory{}
Z
zhenshan.cao 已提交
467 468
	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	inputStream.AsProducer(producerChannels)
B
bigsheeper 已提交
469 470
	inputStream.Start()

Z
zhenshan.cao 已提交
471 472
	outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	outputStream.AsConsumer(consumerChannels, consumerSubName)
B
bigsheeper 已提交
473
	outputStream.Start()
X
Xiangyu Wang 已提交
474
	var output msgstream.MsgStream = outputStream
B
bigsheeper 已提交
475 476 477 478 479

	err := (*inputStream).Produce(&msgPack)
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
480
	receiveMsg(output, len(msgPack.Msgs)*2)
B
bigsheeper 已提交
481 482 483 484 485
	(*inputStream).Close()
	(*outputStream).Close()
}

func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
C
cai.zhang 已提交
486
	pulsarAddress, _ := Params.Load("_PulsarAddress")
B
bigsheeper 已提交
487 488 489 490
	producerChannels := []string{"insert1", "insert2"}
	consumerChannels := []string{"insert1", "insert2"}
	consumerSubName := "subInsert"

X
Xiangyu Wang 已提交
491
	msgPack := msgstream.MsgPack{}
492 493 494 495
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearch, 2, 2))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 3, 3))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kQueryNodeStats, 4, 4))
B
bigsheeper 已提交
496

497
	factory := msgstream.ProtoUDFactory{}
Z
zhenshan.cao 已提交
498 499
	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	inputStream.AsProducer(producerChannels)
B
bigsheeper 已提交
500 501
	inputStream.Start()

Z
zhenshan.cao 已提交
502 503
	outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	outputStream.AsConsumer(consumerChannels, consumerSubName)
B
bigsheeper 已提交
504
	outputStream.Start()
X
Xiangyu Wang 已提交
505
	var output msgstream.MsgStream = outputStream
B
bigsheeper 已提交
506 507 508 509 510

	err := (*inputStream).Produce(&msgPack)
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
511
	receiveMsg(output, len(msgPack.Msgs))
B
bigsheeper 已提交
512 513 514 515
	(*inputStream).Close()
	(*outputStream).Close()
}

516
func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
C
cai.zhang 已提交
517
	pulsarAddress, _ := Params.Load("_PulsarAddress")
518
	producerChannels := []string{"insert1", "insert2"}
519 520 521
	consumerChannels := []string{"insert1", "insert2"}
	consumerSubName := "subInsert"

X
Xiangyu Wang 已提交
522
	msgPack0 := msgstream.MsgPack{}
523
	msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))
524

X
Xiangyu Wang 已提交
525
	msgPack1 := msgstream.MsgPack{}
526 527
	msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
	msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
528

X
Xiangyu Wang 已提交
529
	msgPack2 := msgstream.MsgPack{}
530
	msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))
531 532

	inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
533
	err := inputStream.Broadcast(&msgPack0)
534 535 536
	if err != nil {
		log.Fatalf("broadcast error = %v", err)
	}
537
	err = inputStream.Produce(&msgPack1)
538 539 540
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
541
	err = inputStream.Broadcast(&msgPack2)
542 543 544 545
	if err != nil {
		log.Fatalf("broadcast error = %v", err)
	}
	receiveMsg(outputStream, len(msgPack1.Msgs))
546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
	inputStream.Close()
	outputStream.Close()
}

func TestStream_PulsarTtMsgStream_Seek(t *testing.T) {
	pulsarAddress, _ := Params.Load("_PulsarAddress")
	producerChannels := []string{"seek_insert1", "seek_insert2"}
	consumerChannels := []string{"seek_insert1", "seek_insert2"}
	consumerSubName := "subInsert"

	msgPack0 := MsgPack{}
	msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))

	msgPack1 := MsgPack{}
	msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
	msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 19, 19))

	msgPack2 := MsgPack{}
	msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))

	msgPack3 := MsgPack{}
	msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_kInsert, 14, 14))
	msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_kInsert, 9, 9))

	msgPack4 := MsgPack{}
X
XuanYang-cn 已提交
571
	msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11, 11, 11))
572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600

	msgPack5 := MsgPack{}
	msgPack5.Msgs = append(msgPack5.Msgs, getTimeTickMsg(15, 15, 15))

	inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
	err := inputStream.Broadcast(&msgPack0)
	assert.Nil(t, err)
	err = inputStream.Produce(&msgPack1)
	assert.Nil(t, err)
	err = inputStream.Broadcast(&msgPack2)
	assert.Nil(t, err)
	err = inputStream.Produce(&msgPack3)
	assert.Nil(t, err)
	err = inputStream.Broadcast(&msgPack4)
	assert.Nil(t, err)

	outputStream.Consume()
	receivedMsg := outputStream.Consume()
	for _, position := range receivedMsg.StartPositions {
		outputStream.Seek(position)
	}
	err = inputStream.Broadcast(&msgPack5)
	assert.Nil(t, err)
	seekMsg := outputStream.Consume()
	for _, msg := range seekMsg.Msgs {
		assert.Equal(t, msg.BeginTs(), uint64(14))
	}
	inputStream.Close()
	outputStream.Close()
601 602 603 604 605
}

func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) {
	pulsarAddress, _ := Params.Load("_PulsarAddress")
	producerChannels := []string{"insert1", "insert2"}
606 607 608
	consumerChannels := []string{"insert1", "insert2"}
	consumerSubName := "subInsert"

X
Xiangyu Wang 已提交
609
	msgPack0 := msgstream.MsgPack{}
610
	msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))
611

X
Xiangyu Wang 已提交
612
	msgPack1 := msgstream.MsgPack{}
613 614
	msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
	msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
615

X
Xiangyu Wang 已提交
616
	msgPack2 := msgstream.MsgPack{}
617
	msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))
618 619

	inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
620
	err := inputStream.Broadcast(&msgPack0)
B
bigsheeper 已提交
621 622 623
	if err != nil {
		log.Fatalf("broadcast error = %v", err)
	}
624
	err = inputStream.Produce(&msgPack1)
B
bigsheeper 已提交
625 626 627
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
628
	err = inputStream.Broadcast(&msgPack2)
B
bigsheeper 已提交
629 630 631
	if err != nil {
		log.Fatalf("broadcast error = %v", err)
	}
632
	receiveMsg(outputStream, len(msgPack1.Msgs))
633 634
	inputStream.Close()
	outputStream.Close()
C
cai.zhang 已提交
635
}