pulsar_msgstream_test.go 20.5 KB
Newer Older
X
Xiangyu Wang 已提交
1
package pulsarms
Z
zhenshan.cao 已提交
2 3

import (
4
	"context"
Z
zhenshan.cao 已提交
5
	"fmt"
B
bigsheeper 已提交
6
	"log"
C
cai.zhang 已提交
7
	"os"
Z
zhenshan.cao 已提交
8 9
	"testing"

10 11
	"github.com/zilliztech/milvus-distributed/internal/util/funcutil"

12 13
	"github.com/stretchr/testify/assert"

X
Xiangyu Wang 已提交
14
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
15
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
16
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
C
cai.zhang 已提交
17
	"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
Z
zhenshan.cao 已提交
18 19
)

C
cai.zhang 已提交
20 21 22 23 24 25 26 27
var Params paramtable.BaseTable

func TestMain(m *testing.M) {
	Params.Init()
	exitCode := m.Run()
	os.Exit(exitCode)
}

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
func repackFunc(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
	result := make(map[int32]*MsgPack)
	for i, request := range msgs {
		keys := hashKeys[i]
		for _, channelID := range keys {
			_, ok := result[channelID]
			if ok == false {
				msgPack := MsgPack{}
				result[channelID] = &msgPack
			}
			result[channelID].Msgs = append(result[channelID].Msgs, request)
		}
	}
	return result, nil
}

func getTsMsg(msgType MsgType, reqID UniqueID, hashValue uint32) TsMsg {
	baseMsg := BaseMsg{
		BeginTimestamp: 0,
		EndTimestamp:   0,
		HashValues:     []uint32{hashValue},
	}
	switch msgType {
	case commonpb.MsgType_kInsert:
		insertRequest := internalpb2.InsertRequest{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kInsert,
				MsgID:     reqID,
				Timestamp: 11,
				SourceID:  reqID,
			},
			CollectionName: "Collection",
			PartitionName:  "Partition",
			SegmentID:      1,
			ChannelID:      "0",
			Timestamps:     []Timestamp{uint64(reqID)},
			RowIDs:         []int64{1},
			RowData:        []*commonpb.Blob{{}},
		}
		insertMsg := &msgstream.InsertMsg{
			BaseMsg:       baseMsg,
			InsertRequest: insertRequest,
		}
		return insertMsg
	case commonpb.MsgType_kDelete:
		deleteRequest := internalpb2.DeleteRequest{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kDelete,
				MsgID:     reqID,
				Timestamp: 11,
				SourceID:  reqID,
			},
			CollectionName: "Collection",
			ChannelID:      "1",
			Timestamps:     []Timestamp{1},
			PrimaryKeys:    []IntPrimaryKey{1},
		}
		deleteMsg := &msgstream.DeleteMsg{
			BaseMsg:       baseMsg,
			DeleteRequest: deleteRequest,
		}
		return deleteMsg
	case commonpb.MsgType_kSearch:
		searchRequest := internalpb2.SearchRequest{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kSearch,
				MsgID:     reqID,
				Timestamp: 11,
				SourceID:  reqID,
			},
			Query:           nil,
			ResultChannelID: "0",
		}
		searchMsg := &msgstream.SearchMsg{
			BaseMsg:       baseMsg,
			SearchRequest: searchRequest,
		}
		return searchMsg
	case commonpb.MsgType_kSearchResult:
		searchResult := internalpb2.SearchResults{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kSearchResult,
				MsgID:     reqID,
				Timestamp: 1,
				SourceID:  reqID,
			},
Q
quicksilver 已提交
114
			Status:          &commonpb.Status{ErrorCode: commonpb.ErrorCode_ERROR_CODE_SUCCESS},
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
			ResultChannelID: "0",
		}
		searchResultMsg := &msgstream.SearchResultMsg{
			BaseMsg:       baseMsg,
			SearchResults: searchResult,
		}
		return searchResultMsg
	case commonpb.MsgType_kTimeTick:
		timeTickResult := internalpb2.TimeTickMsg{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kTimeTick,
				MsgID:     reqID,
				Timestamp: 1,
				SourceID:  reqID,
			},
		}
		timeTickMsg := &TimeTickMsg{
			BaseMsg:     baseMsg,
			TimeTickMsg: timeTickResult,
		}
		return timeTickMsg
	case commonpb.MsgType_kQueryNodeStats:
		queryNodeSegStats := internalpb2.QueryNodeStats{
			Base: &commonpb.MsgBase{
				MsgType:  commonpb.MsgType_kQueryNodeStats,
				SourceID: reqID,
			},
		}
		queryNodeSegStatsMsg := &QueryNodeStatsMsg{
			BaseMsg:        baseMsg,
			QueryNodeStats: queryNodeSegStats,
		}
		return queryNodeSegStatsMsg
	}
	return nil
}

func getTimeTickMsg(reqID UniqueID, hashValue uint32, time uint64) TsMsg {
	baseMsg := BaseMsg{
		BeginTimestamp: 0,
		EndTimestamp:   0,
		HashValues:     []uint32{hashValue},
	}
	timeTickResult := internalpb2.TimeTickMsg{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_kTimeTick,
			MsgID:     reqID,
			Timestamp: time,
			SourceID:  reqID,
		},
	}
	timeTickMsg := &TimeTickMsg{
		BaseMsg:     baseMsg,
		TimeTickMsg: timeTickResult,
	}
	return timeTickMsg
}

173
func initPulsarStream(pulsarAddress string,
Z
zhenshan.cao 已提交
174 175 176
	producerChannels []string,
	consumerChannels []string,
	consumerSubName string,
177
	opts ...RepackFunc) (msgstream.MsgStream, msgstream.MsgStream) {
178
	factory := msgstream.ProtoUDFactory{}
Z
zhenshan.cao 已提交
179 180

	// set input stream
Z
zhenshan.cao 已提交
181 182
	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	inputStream.AsProducer(producerChannels)
183 184 185
	for _, opt := range opts {
		inputStream.SetRepackFunc(opt)
	}
186
	inputStream.Start()
X
Xiangyu Wang 已提交
187
	var input msgstream.MsgStream = inputStream
Z
zhenshan.cao 已提交
188 189

	// set output stream
Z
zhenshan.cao 已提交
190 191
	outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	outputStream.AsConsumer(consumerChannels, consumerSubName)
Z
zhenshan.cao 已提交
192
	outputStream.Start()
X
Xiangyu Wang 已提交
193
	var output msgstream.MsgStream = outputStream
Z
zhenshan.cao 已提交
194

195
	return input, output
196 197 198 199 200 201
}

func initPulsarTtStream(pulsarAddress string,
	producerChannels []string,
	consumerChannels []string,
	consumerSubName string,
202
	opts ...RepackFunc) (msgstream.MsgStream, msgstream.MsgStream) {
203
	factory := msgstream.ProtoUDFactory{}
204 205

	// set input stream
Z
zhenshan.cao 已提交
206 207
	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	inputStream.AsProducer(producerChannels)
208 209
	for _, opt := range opts {
		inputStream.SetRepackFunc(opt)
N
neza2017 已提交
210
	}
Z
zhenshan.cao 已提交
211
	inputStream.Start()
X
Xiangyu Wang 已提交
212
	var input msgstream.MsgStream = inputStream
213 214

	// set output stream
G
groot 已提交
215
	outputStream, _ := newPulsarTtMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
Z
zhenshan.cao 已提交
216
	outputStream.AsConsumer(consumerChannels, consumerSubName)
217
	outputStream.Start()
X
Xiangyu Wang 已提交
218
	var output msgstream.MsgStream = outputStream
219

220
	return input, output
221
}
N
neza2017 已提交
222

223
func receiveMsg(outputStream msgstream.MsgStream, msgCount int) {
Z
zhenshan.cao 已提交
224 225
	receiveCount := 0
	for {
226
		result, _ := outputStream.Consume()
Z
zhenshan.cao 已提交
227 228 229 230
		if len(result.Msgs) > 0 {
			msgs := result.Msgs
			for _, v := range msgs {
				receiveCount++
X
xige-16 已提交
231
				fmt.Println("msg type: ", v.Type(), ", msg value: ", v)
Z
zhenshan.cao 已提交
232 233
			}
		}
234 235
		if receiveCount >= msgCount {
			break
Z
zhenshan.cao 已提交
236 237 238 239
		}
	}
}

240
func TestStream_PulsarMsgStream_Insert(t *testing.T) {
X
XuanYang-cn 已提交
241
	ctx := context.Background()
C
cai.zhang 已提交
242
	pulsarAddress, _ := Params.Load("_PulsarAddress")
243 244 245 246
	c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
	producerChannels := []string{c1, c2}
	consumerChannels := []string{c1, c2}
	consumerSubName := funcutil.RandomString(8)
Z
zhenshan.cao 已提交
247

X
Xiangyu Wang 已提交
248
	msgPack := msgstream.MsgPack{}
249 250
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
Z
zhenshan.cao 已提交
251

252
	inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
X
XuanYang-cn 已提交
253
	err := inputStream.Produce(ctx, &msgPack)
B
bigsheeper 已提交
254 255 256 257
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}

258
	receiveMsg(outputStream, len(msgPack.Msgs))
259 260
	inputStream.Close()
	outputStream.Close()
261

Z
zhenshan.cao 已提交
262 263
}

264
func TestStream_PulsarMsgStream_Delete(t *testing.T) {
X
XuanYang-cn 已提交
265
	ctx := context.Background()
C
cai.zhang 已提交
266
	pulsarAddress, _ := Params.Load("_PulsarAddress")
267 268 269 270
	c := funcutil.RandomString(8)
	producerChannels := []string{c}
	consumerChannels := []string{c}
	consumerSubName := funcutil.RandomString(8)
X
Xiangyu Wang 已提交
271
	msgPack := msgstream.MsgPack{}
272
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kDelete, 1, 1))
273
	//msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kDelete, 3, 3))
Z
zhenshan.cao 已提交
274

275
	inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
X
XuanYang-cn 已提交
276
	err := inputStream.Produce(ctx, &msgPack)
B
bigsheeper 已提交
277 278 279
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
280
	receiveMsg(outputStream, len(msgPack.Msgs))
281 282
	inputStream.Close()
	outputStream.Close()
Z
zhenshan.cao 已提交
283 284
}

285
func TestStream_PulsarMsgStream_Search(t *testing.T) {
X
XuanYang-cn 已提交
286
	ctx := context.Background()
C
cai.zhang 已提交
287
	pulsarAddress, _ := Params.Load("_PulsarAddress")
288 289 290 291
	c := funcutil.RandomString(8)
	producerChannels := []string{c}
	consumerChannels := []string{c}
	consumerSubName := funcutil.RandomString(8)
Z
zhenshan.cao 已提交
292

X
Xiangyu Wang 已提交
293
	msgPack := msgstream.MsgPack{}
294 295
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearch, 1, 1))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearch, 3, 3))
Z
zhenshan.cao 已提交
296

297
	inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
X
XuanYang-cn 已提交
298
	err := inputStream.Produce(ctx, &msgPack)
B
bigsheeper 已提交
299 300 301
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
302
	receiveMsg(outputStream, len(msgPack.Msgs))
303 304
	inputStream.Close()
	outputStream.Close()
Z
zhenshan.cao 已提交
305 306
}

307
func TestStream_PulsarMsgStream_SearchResult(t *testing.T) {
X
XuanYang-cn 已提交
308
	ctx := context.Background()
C
cai.zhang 已提交
309
	pulsarAddress, _ := Params.Load("_PulsarAddress")
310 311 312 313
	c := funcutil.RandomString(8)
	producerChannels := []string{c}
	consumerChannels := []string{c}
	consumerSubName := funcutil.RandomString(8)
X
Xiangyu Wang 已提交
314
	msgPack := msgstream.MsgPack{}
315 316
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 1, 1))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 3, 3))
Z
zhenshan.cao 已提交
317

318
	inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
X
XuanYang-cn 已提交
319
	err := inputStream.Produce(ctx, &msgPack)
B
bigsheeper 已提交
320 321 322
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
323
	receiveMsg(outputStream, len(msgPack.Msgs))
324 325
	inputStream.Close()
	outputStream.Close()
Z
zhenshan.cao 已提交
326 327
}

328
func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
X
XuanYang-cn 已提交
329
	ctx := context.Background()
C
cai.zhang 已提交
330
	pulsarAddress, _ := Params.Load("_PulsarAddress")
331 332 333 334
	c := funcutil.RandomString(8)
	producerChannels := []string{c}
	consumerChannels := []string{c}
	consumerSubName := funcutil.RandomString(8)
X
Xiangyu Wang 已提交
335
	msgPack := msgstream.MsgPack{}
336 337
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 3, 3))
Z
zhenshan.cao 已提交
338

339
	inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
X
XuanYang-cn 已提交
340
	err := inputStream.Produce(ctx, &msgPack)
B
bigsheeper 已提交
341 342 343
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
344
	receiveMsg(outputStream, len(msgPack.Msgs))
345 346
	inputStream.Close()
	outputStream.Close()
Z
zhenshan.cao 已提交
347
}
C
cai.zhang 已提交
348

349
func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
X
XuanYang-cn 已提交
350
	ctx := context.Background()
C
cai.zhang 已提交
351
	pulsarAddress, _ := Params.Load("_PulsarAddress")
352 353 354 355
	c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
	producerChannels := []string{c1, c2}
	consumerChannels := []string{c1, c2}
	consumerSubName := funcutil.RandomString(8)
C
cai.zhang 已提交
356

X
Xiangyu Wang 已提交
357
	msgPack := msgstream.MsgPack{}
358 359
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 3, 3))
C
cai.zhang 已提交
360

361
	inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
X
XuanYang-cn 已提交
362
	err := inputStream.Broadcast(ctx, &msgPack)
B
bigsheeper 已提交
363 364 365
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
366
	receiveMsg(outputStream, len(consumerChannels)*len(msgPack.Msgs))
367 368
	inputStream.Close()
	outputStream.Close()
369 370 371
}

func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) {
X
XuanYang-cn 已提交
372
	ctx := context.Background()
C
cai.zhang 已提交
373
	pulsarAddress, _ := Params.Load("_PulsarAddress")
374 375 376 377
	c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
	producerChannels := []string{c1, c2}
	consumerChannels := []string{c1, c2}
	consumerSubName := funcutil.RandomString(8)
378

X
Xiangyu Wang 已提交
379
	msgPack := msgstream.MsgPack{}
380 381
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
382

383
	inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, repackFunc)
X
XuanYang-cn 已提交
384
	err := inputStream.Produce(ctx, &msgPack)
B
bigsheeper 已提交
385 386 387
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
388
	receiveMsg(outputStream, len(msgPack.Msgs))
389 390
	inputStream.Close()
	outputStream.Close()
391 392
}

B
bigsheeper 已提交
393
func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
X
XuanYang-cn 已提交
394
	ctx := context.Background()
C
cai.zhang 已提交
395
	pulsarAddress, _ := Params.Load("_PulsarAddress")
396 397 398 399
	c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
	producerChannels := []string{c1, c2}
	consumerChannels := []string{c1, c2}
	consumerSubName := funcutil.RandomString(8)
X
Xiangyu Wang 已提交
400
	baseMsg := msgstream.BaseMsg{
B
bigsheeper 已提交
401 402
		BeginTimestamp: 0,
		EndTimestamp:   0,
N
neza2017 已提交
403
		HashValues:     []uint32{1, 3},
B
bigsheeper 已提交
404 405
	}

406 407 408 409 410 411 412
	insertRequest := internalpb2.InsertRequest{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_kInsert,
			MsgID:     1,
			Timestamp: 1,
			SourceID:  1,
		},
B
bigsheeper 已提交
413
		CollectionName: "Collection",
414
		PartitionName:  "Partition",
415
		SegmentID:      1,
416
		ChannelID:      "1",
X
Xiangyu Wang 已提交
417
		Timestamps:     []msgstream.Timestamp{1, 1},
418
		RowIDs:         []int64{1, 3},
419
		RowData:        []*commonpb.Blob{{}, {}},
B
bigsheeper 已提交
420
	}
X
Xiangyu Wang 已提交
421
	insertMsg := &msgstream.InsertMsg{
B
bigsheeper 已提交
422 423 424
		BaseMsg:       baseMsg,
		InsertRequest: insertRequest,
	}
X
xige-16 已提交
425

X
Xiangyu Wang 已提交
426
	msgPack := msgstream.MsgPack{}
X
xige-16 已提交
427
	msgPack.Msgs = append(msgPack.Msgs, insertMsg)
B
bigsheeper 已提交
428

429
	factory := msgstream.ProtoUDFactory{}
Z
zhenshan.cao 已提交
430 431
	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	inputStream.AsProducer(producerChannels)
B
bigsheeper 已提交
432 433
	inputStream.Start()

Z
zhenshan.cao 已提交
434 435
	outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	outputStream.AsConsumer(consumerChannels, consumerSubName)
B
bigsheeper 已提交
436
	outputStream.Start()
X
Xiangyu Wang 已提交
437
	var output msgstream.MsgStream = outputStream
B
bigsheeper 已提交
438

X
XuanYang-cn 已提交
439
	err := (*inputStream).Produce(ctx, &msgPack)
B
bigsheeper 已提交
440 441 442
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
443
	receiveMsg(output, len(msgPack.Msgs)*2)
B
bigsheeper 已提交
444 445 446 447 448
	(*inputStream).Close()
	(*outputStream).Close()
}

func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
X
XuanYang-cn 已提交
449
	ctx := context.Background()
C
cai.zhang 已提交
450
	pulsarAddress, _ := Params.Load("_PulsarAddress")
451 452 453 454
	c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
	producerChannels := []string{c1, c2}
	consumerChannels := []string{c1, c2}
	consumerSubName := funcutil.RandomString(8)
B
bigsheeper 已提交
455

X
Xiangyu Wang 已提交
456
	baseMsg := msgstream.BaseMsg{
B
bigsheeper 已提交
457 458
		BeginTimestamp: 0,
		EndTimestamp:   0,
N
neza2017 已提交
459
		HashValues:     []uint32{1, 3},
B
bigsheeper 已提交
460 461
	}

462 463 464 465 466 467 468
	deleteRequest := internalpb2.DeleteRequest{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_kDelete,
			MsgID:     1,
			Timestamp: 1,
			SourceID:  1,
		},
B
bigsheeper 已提交
469
		CollectionName: "Collection",
470
		ChannelID:      "1",
X
Xiangyu Wang 已提交
471
		Timestamps:     []msgstream.Timestamp{1, 1},
B
bigsheeper 已提交
472 473
		PrimaryKeys:    []int64{1, 3},
	}
X
Xiangyu Wang 已提交
474
	deleteMsg := &msgstream.DeleteMsg{
B
bigsheeper 已提交
475 476 477
		BaseMsg:       baseMsg,
		DeleteRequest: deleteRequest,
	}
X
xige-16 已提交
478

X
Xiangyu Wang 已提交
479
	msgPack := msgstream.MsgPack{}
X
xige-16 已提交
480
	msgPack.Msgs = append(msgPack.Msgs, deleteMsg)
B
bigsheeper 已提交
481

482
	factory := msgstream.ProtoUDFactory{}
Z
zhenshan.cao 已提交
483 484
	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	inputStream.AsProducer(producerChannels)
B
bigsheeper 已提交
485 486
	inputStream.Start()

Z
zhenshan.cao 已提交
487 488
	outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	outputStream.AsConsumer(consumerChannels, consumerSubName)
B
bigsheeper 已提交
489
	outputStream.Start()
X
Xiangyu Wang 已提交
490
	var output msgstream.MsgStream = outputStream
B
bigsheeper 已提交
491

X
XuanYang-cn 已提交
492
	err := (*inputStream).Produce(ctx, &msgPack)
B
bigsheeper 已提交
493 494 495
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
496
	receiveMsg(output, len(msgPack.Msgs)*2)
B
bigsheeper 已提交
497 498 499 500 501
	(*inputStream).Close()
	(*outputStream).Close()
}

func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
X
XuanYang-cn 已提交
502
	ctx := context.Background()
C
cai.zhang 已提交
503
	pulsarAddress, _ := Params.Load("_PulsarAddress")
504 505 506 507
	c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
	producerChannels := []string{c1, c2}
	consumerChannels := []string{c1, c2}
	consumerSubName := funcutil.RandomString(8)
B
bigsheeper 已提交
508

X
Xiangyu Wang 已提交
509
	msgPack := msgstream.MsgPack{}
510 511 512 513
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearch, 2, 2))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 3, 3))
	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kQueryNodeStats, 4, 4))
B
bigsheeper 已提交
514

515
	factory := msgstream.ProtoUDFactory{}
Z
zhenshan.cao 已提交
516 517
	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	inputStream.AsProducer(producerChannels)
B
bigsheeper 已提交
518 519
	inputStream.Start()

Z
zhenshan.cao 已提交
520 521
	outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
	outputStream.AsConsumer(consumerChannels, consumerSubName)
B
bigsheeper 已提交
522
	outputStream.Start()
X
Xiangyu Wang 已提交
523
	var output msgstream.MsgStream = outputStream
B
bigsheeper 已提交
524

X
XuanYang-cn 已提交
525
	err := (*inputStream).Produce(ctx, &msgPack)
B
bigsheeper 已提交
526 527 528
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
529
	receiveMsg(output, len(msgPack.Msgs))
B
bigsheeper 已提交
530 531 532 533
	(*inputStream).Close()
	(*outputStream).Close()
}

534
func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
X
XuanYang-cn 已提交
535
	ctx := context.Background()
C
cai.zhang 已提交
536
	pulsarAddress, _ := Params.Load("_PulsarAddress")
537 538 539 540
	c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
	producerChannels := []string{c1, c2}
	consumerChannels := []string{c1, c2}
	consumerSubName := funcutil.RandomString(8)
X
Xiangyu Wang 已提交
541
	msgPack0 := msgstream.MsgPack{}
542
	msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))
543

X
Xiangyu Wang 已提交
544
	msgPack1 := msgstream.MsgPack{}
545 546
	msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
	msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
547

X
Xiangyu Wang 已提交
548
	msgPack2 := msgstream.MsgPack{}
549
	msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))
550 551

	inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
X
XuanYang-cn 已提交
552
	err := inputStream.Broadcast(ctx, &msgPack0)
553 554 555
	if err != nil {
		log.Fatalf("broadcast error = %v", err)
	}
X
XuanYang-cn 已提交
556
	err = inputStream.Produce(ctx, &msgPack1)
557 558 559
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
X
XuanYang-cn 已提交
560
	err = inputStream.Broadcast(ctx, &msgPack2)
561 562 563 564
	if err != nil {
		log.Fatalf("broadcast error = %v", err)
	}
	receiveMsg(outputStream, len(msgPack1.Msgs))
565 566 567 568 569
	inputStream.Close()
	outputStream.Close()
}

func TestStream_PulsarTtMsgStream_Seek(t *testing.T) {
X
XuanYang-cn 已提交
570
	ctx := context.Background()
571
	pulsarAddress, _ := Params.Load("_PulsarAddress")
572 573 574 575
	c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
	producerChannels := []string{c1, c2}
	consumerChannels := []string{c1, c2}
	consumerSubName := funcutil.RandomString(8)
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591

	msgPack0 := MsgPack{}
	msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))

	msgPack1 := MsgPack{}
	msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
	msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 19, 19))

	msgPack2 := MsgPack{}
	msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))

	msgPack3 := MsgPack{}
	msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_kInsert, 14, 14))
	msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_kInsert, 9, 9))

	msgPack4 := MsgPack{}
X
XuanYang-cn 已提交
592
	msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11, 11, 11))
593 594 595 596 597

	msgPack5 := MsgPack{}
	msgPack5.Msgs = append(msgPack5.Msgs, getTimeTickMsg(15, 15, 15))

	inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
X
XuanYang-cn 已提交
598
	err := inputStream.Broadcast(ctx, &msgPack0)
599
	assert.Nil(t, err)
X
XuanYang-cn 已提交
600
	err = inputStream.Produce(ctx, &msgPack1)
601
	assert.Nil(t, err)
X
XuanYang-cn 已提交
602
	err = inputStream.Broadcast(ctx, &msgPack2)
603
	assert.Nil(t, err)
X
XuanYang-cn 已提交
604
	err = inputStream.Produce(ctx, &msgPack3)
605
	assert.Nil(t, err)
X
XuanYang-cn 已提交
606
	err = inputStream.Broadcast(ctx, &msgPack4)
607 608 609
	assert.Nil(t, err)

	outputStream.Consume()
610
	receivedMsg, _ := outputStream.Consume()
611 612 613
	for _, position := range receivedMsg.StartPositions {
		outputStream.Seek(position)
	}
X
XuanYang-cn 已提交
614
	err = inputStream.Broadcast(ctx, &msgPack5)
615
	assert.Nil(t, err)
616
	seekMsg, _ := outputStream.Consume()
617 618 619 620 621
	for _, msg := range seekMsg.Msgs {
		assert.Equal(t, msg.BeginTs(), uint64(14))
	}
	inputStream.Close()
	outputStream.Close()
622 623 624
}

func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) {
X
XuanYang-cn 已提交
625
	ctx := context.Background()
626
	pulsarAddress, _ := Params.Load("_PulsarAddress")
627 628 629 630
	c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
	producerChannels := []string{c1, c2}
	consumerChannels := []string{c1, c2}
	consumerSubName := funcutil.RandomString(8)
631

X
Xiangyu Wang 已提交
632
	msgPack0 := msgstream.MsgPack{}
633
	msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))
634

X
Xiangyu Wang 已提交
635
	msgPack1 := msgstream.MsgPack{}
636 637
	msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
	msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
638

X
Xiangyu Wang 已提交
639
	msgPack2 := msgstream.MsgPack{}
640
	msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))
641 642

	inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
X
XuanYang-cn 已提交
643
	err := inputStream.Broadcast(ctx, &msgPack0)
B
bigsheeper 已提交
644 645 646
	if err != nil {
		log.Fatalf("broadcast error = %v", err)
	}
X
XuanYang-cn 已提交
647
	err = inputStream.Produce(ctx, &msgPack1)
B
bigsheeper 已提交
648 649 650
	if err != nil {
		log.Fatalf("produce error = %v", err)
	}
X
XuanYang-cn 已提交
651
	err = inputStream.Broadcast(ctx, &msgPack2)
B
bigsheeper 已提交
652 653 654
	if err != nil {
		log.Fatalf("broadcast error = %v", err)
	}
655
	receiveMsg(outputStream, len(msgPack1.Msgs))
656 657
	inputStream.Close()
	outputStream.Close()
C
cai.zhang 已提交
658
}