diff --git a/internal/msgstream/msgstream_mock.go b/internal/msgstream/msgstream_mock.go new file mode 100644 index 0000000000000000000000000000000000000000..146390adbef72c724b3fe4d7675f7ecb12ea7e4f --- /dev/null +++ b/internal/msgstream/msgstream_mock.go @@ -0,0 +1,65 @@ +package msgstream + +import "sync" + +type SimpleMsgStream struct { + msgChan chan *MsgPack + + msgCount int + msgCountMtx sync.RWMutex +} + +func (ms *SimpleMsgStream) Start() { +} + +func (ms *SimpleMsgStream) Close() { +} + +func (ms *SimpleMsgStream) Chan() <-chan *MsgPack { + return ms.msgChan +} + +func (ms *SimpleMsgStream) AsProducer(channels []string) { +} + +func (ms *SimpleMsgStream) AsConsumer(channels []string, subName string) { +} + +func (ms *SimpleMsgStream) SetRepackFunc(repackFunc RepackFunc) { +} + +func (ms *SimpleMsgStream) Produce(pack *MsgPack) error { + ms.msgCountMtx.Lock() + defer ms.msgCountMtx.Unlock() + + ms.msgChan <- pack + ms.msgCount++ + + return nil +} + +func (ms *SimpleMsgStream) Broadcast(pack *MsgPack) error { + return nil +} + +func (ms *SimpleMsgStream) Consume() *MsgPack { + ms.msgCountMtx.RLock() + defer ms.msgCountMtx.RUnlock() + + if ms.msgCount <= 0 { + return nil + } + + return <-ms.msgChan +} + +func (ms *SimpleMsgStream) Seek(offset *MsgPosition) error { + return nil +} + +func NewSimpleMsgStream() *SimpleMsgStream { + return &SimpleMsgStream{ + msgChan: make(chan *MsgPack, 1024), + msgCount: 0, + } +} diff --git a/internal/proxynode/task_scheduler.go b/internal/proxynode/task_scheduler.go index 13d380083e3cc6717e554049c0e06023bbcdbac4..9f2ec6acaf150d44a9ce89f96df05f8b2f0f95c7 100644 --- a/internal/proxynode/task_scheduler.go +++ b/internal/proxynode/task_scheduler.go @@ -20,7 +20,7 @@ import ( type TaskQueue interface { utChan() <-chan int - UTEmpty() bool + utEmpty() bool utFull() bool addUnissuedTask(t task) error FrontUnissuedTask() task @@ -50,7 +50,7 @@ func (queue *BaseTaskQueue) utChan() <-chan int { return queue.utBufChan } -func (queue *BaseTaskQueue) UTEmpty() bool { +func (queue *BaseTaskQueue) utEmpty() bool { queue.utLock.Lock() defer queue.utLock.Unlock() return queue.unissuedTasks.Len() == 0 @@ -343,7 +343,7 @@ func (sched *TaskScheduler) definitionLoop() { case <-sched.ctx.Done(): return case <-sched.DdQueue.utChan(): - if !sched.DdQueue.UTEmpty() { + if !sched.DdQueue.utEmpty() { t := sched.scheduleDdTask() sched.processTask(t, sched.DdQueue) } @@ -358,7 +358,7 @@ func (sched *TaskScheduler) manipulationLoop() { case <-sched.ctx.Done(): return case <-sched.DmQueue.utChan(): - if !sched.DmQueue.UTEmpty() { + if !sched.DmQueue.utEmpty() { t := sched.scheduleDmTask() go sched.processTask(t, sched.DmQueue) } @@ -374,7 +374,7 @@ func (sched *TaskScheduler) queryLoop() { case <-sched.ctx.Done(): return case <-sched.DqQueue.utChan(): - if !sched.DqQueue.UTEmpty() { + if !sched.DqQueue.utEmpty() { t := sched.scheduleDqTask() go sched.processTask(t, sched.DqQueue) } else { diff --git a/internal/proxyservice/timesync_test.go b/internal/proxyservice/timesync_test.go new file mode 100644 index 0000000000000000000000000000000000000000..19a66ce1c3651e9571f22f293895261fdf229b2a --- /dev/null +++ b/internal/proxyservice/timesync_test.go @@ -0,0 +1,192 @@ +package proxyservice + +import ( + "context" + "math" + "testing" + "time" + + "github.com/zilliztech/milvus-distributed/internal/log" + "go.uber.org/zap" + + "github.com/stretchr/testify/assert" + + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + + "github.com/zilliztech/milvus-distributed/internal/msgstream" +) + +func ttStreamProduceLoop(ctx context.Context, ttStream msgstream.MsgStream, durationInterval time.Duration, sourceID int64) { + log.Debug("ttStreamProduceLoop", zap.Any("durationInterval", durationInterval)) + timer := time.NewTicker(durationInterval) + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + ttMsgs := &msgstream.MsgPack{ + BeginTs: 0, + EndTs: 0, + Msgs: nil, + StartPositions: nil, + EndPositions: nil, + } + + currentT := uint64(time.Now().Nanosecond()) + msg := &msgstream.TimeTickMsg{ + BaseMsg: msgstream.BaseMsg{ + Ctx: ctx, + BeginTimestamp: 0, + EndTimestamp: 0, + HashValues: nil, + MsgPosition: nil, + }, + TimeTickMsg: internalpb.TimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: 0, + MsgID: 0, + Timestamp: currentT, + SourceID: sourceID, + }, + }, + } + + ttMsgs.Msgs = append(ttMsgs.Msgs, msg) + + _ = ttStream.Produce(ttMsgs) + //log.Debug("ttStreamProduceLoop", zap.Any("Send", currentT)) + } + } + }() +} + +func TestSoftTimeTickBarrier_Start(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ttStream := msgstream.NewSimpleMsgStream() + sourceID := 1 + peerIds := []UniqueID{UniqueID(sourceID)} + interval := 100 + minTtInterval := Timestamp(interval) + + durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18 + ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID)) + + ttBarrier := newSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval) + err := ttBarrier.Start() + assert.Equal(t, nil, err) + defer ttBarrier.Close() +} + +func TestSoftTimeTickBarrier_Close(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ttStream := msgstream.NewSimpleMsgStream() + sourceID := 1 + peerIds := []UniqueID{UniqueID(sourceID)} + interval := 100 + minTtInterval := Timestamp(interval) + + durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18 + ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID)) + + ttBarrier := newSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval) + err := ttBarrier.Start() + assert.Equal(t, nil, err) + defer ttBarrier.Close() +} + +func TestSoftTimeTickBarrier_GetTimeTick(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ttStream := msgstream.NewSimpleMsgStream() + sourceID := 1 + peerIds := []UniqueID{UniqueID(sourceID)} + interval := 100 + minTtInterval := Timestamp(interval) + + durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18 + ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID)) + + ttBarrier := newSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval) + err := ttBarrier.Start() + assert.Equal(t, nil, err) + defer ttBarrier.Close() + + num := 10 + for i := 0; i < num; i++ { + tick, err := ttBarrier.GetTimeTick() + assert.Equal(t, nil, err) + log.Debug("TestSoftTimeTickBarrier", zap.Any("GetTimeTick", tick)) + } +} + +func TestSoftTimeTickBarrier_AddPeer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ttStream := msgstream.NewSimpleMsgStream() + sourceID := 1 + peerIds := []UniqueID{UniqueID(sourceID)} + interval := 100 + minTtInterval := Timestamp(interval) + + durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18 + ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID)) + + ttBarrier := newSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval) + err := ttBarrier.Start() + assert.Equal(t, nil, err) + defer ttBarrier.Close() + + newSourceID := UniqueID(2) + err = ttBarrier.AddPeer(newSourceID) + assert.Equal(t, nil, err) + ttStreamProduceLoop(ctx, ttStream, durationInterval, newSourceID) + + num := 10 + for i := 0; i < num; i++ { + tick, err := ttBarrier.GetTimeTick() + assert.Equal(t, nil, err) + log.Debug("TestSoftTimeTickBarrier", zap.Any("GetTimeTick", tick)) + } +} + +func TestSoftTimeTickBarrier_TickChan(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ttStream := msgstream.NewSimpleMsgStream() + sourceID := 1 + peerIds := []UniqueID{UniqueID(sourceID)} + interval := 100 + minTtInterval := Timestamp(interval) + + durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18 + ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID)) + + ttBarrier := newSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval) + err := ttBarrier.Start() + assert.Equal(t, nil, err) + defer ttBarrier.Close() + + duration := time.Second + timer := time.NewTimer(duration) + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + return + case ts := <-ttBarrier.TickChan(): + log.Debug("TestSoftTimeTickBarrier", zap.Any("GetTimeTick", ts)) + } + } +} diff --git a/internal/proxyservice/timetick_test.go b/internal/proxyservice/timetick_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9f0c2b6cc62b043445893dd817ce07d620a1273d --- /dev/null +++ b/internal/proxyservice/timetick_test.go @@ -0,0 +1,55 @@ +package proxyservice + +import ( + "context" + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/msgstream" +) + +func TestTimeTick_Start(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ttStream := msgstream.NewSimpleMsgStream() + sourceID := 1 + peerIds := []UniqueID{UniqueID(sourceID)} + interval := 100 + minTtInterval := Timestamp(interval) + + durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18 + ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID)) + + ttBarrier := newSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval) + channels := msgstream.NewSimpleMsgStream() + + tick := newTimeTick(ctx, ttBarrier, channels) + err := tick.Start() + assert.Equal(t, nil, err) + defer tick.Close() +} + +func TestTimeTick_Close(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ttStream := msgstream.NewSimpleMsgStream() + sourceID := 1 + peerIds := []UniqueID{UniqueID(sourceID)} + interval := 100 + minTtInterval := Timestamp(interval) + + durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18 + ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID)) + + ttBarrier := newSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval) + channels := msgstream.NewSimpleMsgStream() + + tick := newTimeTick(ctx, ttBarrier, channels) + err := tick.Start() + assert.Equal(t, nil, err) + defer tick.Close() +}