提交 bfb7ca0d 编写于 作者: D dragondriver 提交者: yefu.chen

Add unittest to time tick logic in proxy service

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 decc80a5
package msgstream
import "sync"
type SimpleMsgStream struct {
msgChan chan *MsgPack
msgCount int
msgCountMtx sync.RWMutex
}
func (ms *SimpleMsgStream) Start() {
}
func (ms *SimpleMsgStream) Close() {
}
func (ms *SimpleMsgStream) Chan() <-chan *MsgPack {
return ms.msgChan
}
func (ms *SimpleMsgStream) AsProducer(channels []string) {
}
func (ms *SimpleMsgStream) AsConsumer(channels []string, subName string) {
}
func (ms *SimpleMsgStream) SetRepackFunc(repackFunc RepackFunc) {
}
func (ms *SimpleMsgStream) Produce(pack *MsgPack) error {
ms.msgCountMtx.Lock()
defer ms.msgCountMtx.Unlock()
ms.msgChan <- pack
ms.msgCount++
return nil
}
func (ms *SimpleMsgStream) Broadcast(pack *MsgPack) error {
return nil
}
func (ms *SimpleMsgStream) Consume() *MsgPack {
ms.msgCountMtx.RLock()
defer ms.msgCountMtx.RUnlock()
if ms.msgCount <= 0 {
return nil
}
return <-ms.msgChan
}
func (ms *SimpleMsgStream) Seek(offset *MsgPosition) error {
return nil
}
func NewSimpleMsgStream() *SimpleMsgStream {
return &SimpleMsgStream{
msgChan: make(chan *MsgPack, 1024),
msgCount: 0,
}
}
......@@ -20,7 +20,7 @@ import (
type TaskQueue interface {
utChan() <-chan int
UTEmpty() bool
utEmpty() bool
utFull() bool
addUnissuedTask(t task) error
FrontUnissuedTask() task
......@@ -50,7 +50,7 @@ func (queue *BaseTaskQueue) utChan() <-chan int {
return queue.utBufChan
}
func (queue *BaseTaskQueue) UTEmpty() bool {
func (queue *BaseTaskQueue) utEmpty() bool {
queue.utLock.Lock()
defer queue.utLock.Unlock()
return queue.unissuedTasks.Len() == 0
......@@ -343,7 +343,7 @@ func (sched *TaskScheduler) definitionLoop() {
case <-sched.ctx.Done():
return
case <-sched.DdQueue.utChan():
if !sched.DdQueue.UTEmpty() {
if !sched.DdQueue.utEmpty() {
t := sched.scheduleDdTask()
sched.processTask(t, sched.DdQueue)
}
......@@ -358,7 +358,7 @@ func (sched *TaskScheduler) manipulationLoop() {
case <-sched.ctx.Done():
return
case <-sched.DmQueue.utChan():
if !sched.DmQueue.UTEmpty() {
if !sched.DmQueue.utEmpty() {
t := sched.scheduleDmTask()
go sched.processTask(t, sched.DmQueue)
}
......@@ -374,7 +374,7 @@ func (sched *TaskScheduler) queryLoop() {
case <-sched.ctx.Done():
return
case <-sched.DqQueue.utChan():
if !sched.DqQueue.UTEmpty() {
if !sched.DqQueue.utEmpty() {
t := sched.scheduleDqTask()
go sched.processTask(t, sched.DqQueue)
} else {
......
package proxyservice
import (
"context"
"math"
"testing"
"time"
"github.com/zilliztech/milvus-distributed/internal/log"
"go.uber.org/zap"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
)
func ttStreamProduceLoop(ctx context.Context, ttStream msgstream.MsgStream, durationInterval time.Duration, sourceID int64) {
log.Debug("ttStreamProduceLoop", zap.Any("durationInterval", durationInterval))
timer := time.NewTicker(durationInterval)
go func() {
for {
select {
case <-ctx.Done():
return
case <-timer.C:
ttMsgs := &msgstream.MsgPack{
BeginTs: 0,
EndTs: 0,
Msgs: nil,
StartPositions: nil,
EndPositions: nil,
}
currentT := uint64(time.Now().Nanosecond())
msg := &msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
Ctx: ctx,
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: nil,
MsgPosition: nil,
},
TimeTickMsg: internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: 0,
MsgID: 0,
Timestamp: currentT,
SourceID: sourceID,
},
},
}
ttMsgs.Msgs = append(ttMsgs.Msgs, msg)
_ = ttStream.Produce(ttMsgs)
//log.Debug("ttStreamProduceLoop", zap.Any("Send", currentT))
}
}
}()
}
func TestSoftTimeTickBarrier_Start(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ttStream := msgstream.NewSimpleMsgStream()
sourceID := 1
peerIds := []UniqueID{UniqueID(sourceID)}
interval := 100
minTtInterval := Timestamp(interval)
durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18
ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID))
ttBarrier := newSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval)
err := ttBarrier.Start()
assert.Equal(t, nil, err)
defer ttBarrier.Close()
}
func TestSoftTimeTickBarrier_Close(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ttStream := msgstream.NewSimpleMsgStream()
sourceID := 1
peerIds := []UniqueID{UniqueID(sourceID)}
interval := 100
minTtInterval := Timestamp(interval)
durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18
ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID))
ttBarrier := newSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval)
err := ttBarrier.Start()
assert.Equal(t, nil, err)
defer ttBarrier.Close()
}
func TestSoftTimeTickBarrier_GetTimeTick(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ttStream := msgstream.NewSimpleMsgStream()
sourceID := 1
peerIds := []UniqueID{UniqueID(sourceID)}
interval := 100
minTtInterval := Timestamp(interval)
durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18
ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID))
ttBarrier := newSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval)
err := ttBarrier.Start()
assert.Equal(t, nil, err)
defer ttBarrier.Close()
num := 10
for i := 0; i < num; i++ {
tick, err := ttBarrier.GetTimeTick()
assert.Equal(t, nil, err)
log.Debug("TestSoftTimeTickBarrier", zap.Any("GetTimeTick", tick))
}
}
func TestSoftTimeTickBarrier_AddPeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ttStream := msgstream.NewSimpleMsgStream()
sourceID := 1
peerIds := []UniqueID{UniqueID(sourceID)}
interval := 100
minTtInterval := Timestamp(interval)
durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18
ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID))
ttBarrier := newSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval)
err := ttBarrier.Start()
assert.Equal(t, nil, err)
defer ttBarrier.Close()
newSourceID := UniqueID(2)
err = ttBarrier.AddPeer(newSourceID)
assert.Equal(t, nil, err)
ttStreamProduceLoop(ctx, ttStream, durationInterval, newSourceID)
num := 10
for i := 0; i < num; i++ {
tick, err := ttBarrier.GetTimeTick()
assert.Equal(t, nil, err)
log.Debug("TestSoftTimeTickBarrier", zap.Any("GetTimeTick", tick))
}
}
func TestSoftTimeTickBarrier_TickChan(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ttStream := msgstream.NewSimpleMsgStream()
sourceID := 1
peerIds := []UniqueID{UniqueID(sourceID)}
interval := 100
minTtInterval := Timestamp(interval)
durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18
ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID))
ttBarrier := newSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval)
err := ttBarrier.Start()
assert.Equal(t, nil, err)
defer ttBarrier.Close()
duration := time.Second
timer := time.NewTimer(duration)
for {
select {
case <-ctx.Done():
return
case <-timer.C:
return
case ts := <-ttBarrier.TickChan():
log.Debug("TestSoftTimeTickBarrier", zap.Any("GetTimeTick", ts))
}
}
}
package proxyservice
import (
"context"
"math"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
)
func TestTimeTick_Start(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ttStream := msgstream.NewSimpleMsgStream()
sourceID := 1
peerIds := []UniqueID{UniqueID(sourceID)}
interval := 100
minTtInterval := Timestamp(interval)
durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18
ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID))
ttBarrier := newSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval)
channels := msgstream.NewSimpleMsgStream()
tick := newTimeTick(ctx, ttBarrier, channels)
err := tick.Start()
assert.Equal(t, nil, err)
defer tick.Close()
}
func TestTimeTick_Close(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ttStream := msgstream.NewSimpleMsgStream()
sourceID := 1
peerIds := []UniqueID{UniqueID(sourceID)}
interval := 100
minTtInterval := Timestamp(interval)
durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18
ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID))
ttBarrier := newSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval)
channels := msgstream.NewSimpleMsgStream()
tick := newTimeTick(ctx, ttBarrier, channels)
err := tick.Start()
assert.Equal(t, nil, err)
defer tick.Close()
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册