提交 4a11a643 编写于 作者: G godchen 提交者: yefu.chen

Add time sync producer

Signed-off-by: Ngodchen <qingxiang.chen@zilliz.com>
上级 c9c8fb9d
......@@ -1216,25 +1216,28 @@ type TimeTickBarrier interface {
}
type timeSyncMsgProducer struct {
proxyTtBarrier TimeTickBarrier // softTimeTickBarrier
WriteNodeTtBarrier TimeTickBarrier //hardTimeTickBarrier
proxyTtBarrier TimeTickBarrier
//softTimeTickBarrier
writeNodeTtBarrier TimeTickBarrier
//hardTimeTickBarrier
dmSyncStream *MsgStream // insert & delete
k2sSyncStream *MsgStream
dmSyncStream MsgStream // insert & delete
k2sSyncStream MsgStream
ctx context.Context
cancel context.CancelFunc
}
func NewTimeSyncMsgProducer(ctx context.Context) (*timeSyncMsgProducer, error)
func (syncMsgProducer *timeSyncMsgProducer) SetProxyTtStreams(proxyTt *MsgStream, proxyIds []UniqueId)
func (syncMsgProducer *timeSyncMsgProducer) SetWriteNodeTtStreams(WriteNodeTt *MsgStream, writeNodeIds []UniqueId)
func (syncMsgProducer *timeSyncMsgProducer) SetProxyTtBarrier(proxyTtBarrier TimeTickBarrier) {
func (syncMsgProducer *timeSyncMsgProducer) SetWriteNodeTtBarrier(writeNodeTtBarrier TimeTickBarrier) {
func (syncMsgProducer *timeSyncMsgProducer) SetDmSyncStream(dmSyncStream *MsgStream)
func (syncMsgProducer *timeSyncMsgProducer) SetK2sSyncStream(k2sSyncStream *MsgStream)
func (syncMsgProducer *timeSyncMsgProducer) SetDmSyncStream(dmSyncStream MsgStream)
func (syncMsgProducer *timeSyncMsgProducer) SetK2sSyncStream(k2sSyncStream MsgStream)
func (syncMsgProducer *timeSyncMsgProducer) Start() error
func (syncMsgProducer *timeSyncMsgProducer) Close() error
func (syncMsgProducer *timeSyncMsgProducer) Close()
func newTimeSyncMsgProducer(ctx context.Context) *timeSyncMsgProducer error
```
......
package timesync
import (
"context"
"github.com/stretchr/testify/assert"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
"log"
"testing"
"time"
)
type (
TestTickBarrier struct {
value int64
ctx context.Context
}
)
func (ttBarrier *TestTickBarrier) GetTimeTick() (Timestamp, error) {
time.Sleep(1 * time.Second)
ttBarrier.value++
return Timestamp(ttBarrier.value), nil
}
func (ttBarrier *TestTickBarrier) Start() error {
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
{
log.Printf("barrier context done, exit")
return
}
}
}
}(ttBarrier.ctx)
return nil
}
func (ttBarrier *TestTickBarrier) Close() {
_, cancel := context.WithCancel(context.Background())
cancel()
}
func initTestPulsarStream(ctx context.Context, pulsarAddress string,
producerChannels []string,
consumerChannels []string,
consumerSubName string, opts ...ms.RepackFunc) (*ms.MsgStream, *ms.MsgStream) {
// set input stream
inputStream := ms.NewPulsarMsgStream(ctx, 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
}
var input ms.MsgStream = inputStream
// set output stream
outputStream := ms.NewPulsarMsgStream(ctx, 100)
outputStream.SetPulsarCient(pulsarAddress)
unmarshalDispatcher := ms.NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
var output ms.MsgStream = outputStream
return &input, &output
}
func receiveMsg(stream *ms.MsgStream) []uint64 {
receiveCount := 0
var results []uint64
for {
result := (*stream).Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
timetickmsg := (*v).(*ms.TimeTickMsg)
results = append(results, timetickmsg.TimeTickMsg.Timestamp)
receiveCount++
if receiveCount == 10 {
return results
}
}
}
}
}
func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"proxyTtBarrier"}
consumerChannels := []string{"proxyTtBarrier"}
consumerSubName := "proxyTtBarrier"
ctx, _ := context.WithTimeout(context.Background(), 30*time.Second)
proxyTtInputStream, proxyTtOutputStream := initTestPulsarStream(ctx, pulsarAddress, producerChannels, consumerChannels, consumerSubName)
producerChannels = []string{"writeNodeBarrier"}
consumerChannels = []string{"writeNodeBarrier"}
consumerSubName = "writeNodeBarrier"
writeNodeInputStream, writeNodeOutputStream := initTestPulsarStream(ctx, pulsarAddress, producerChannels, consumerChannels, consumerSubName)
timeSyncProducer, _ := NewTimeSyncMsgProducer(ctx)
timeSyncProducer.SetProxyTtBarrier(&TestTickBarrier{ctx: ctx})
timeSyncProducer.SetWriteNodeTtBarrier(&TestTickBarrier{ctx: ctx})
timeSyncProducer.SetDMSyncStream(*proxyTtInputStream)
timeSyncProducer.SetK2sSyncStream(*writeNodeInputStream)
(*proxyTtOutputStream).Start()
(*writeNodeOutputStream).Start()
timeSyncProducer.Start()
expected := []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result_1 := receiveMsg(proxyTtOutputStream)
assert.Equal(t, expected, result_1)
result_2 := receiveMsg(writeNodeOutputStream)
assert.Equal(t, expected, result_2)
timeSyncProducer.Close()
}
package timesync
import (
"github.com/zilliztech/milvus-distributed/internal/errors"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"log"
"context"
)
type timeSyncMsgProducer struct {
//softTimeTickBarrier
proxyTtBarrier TimeTickBarrier
//hardTimeTickBarrier
writeNodeTtBarrier TimeTickBarrier
dmSyncStream ms.MsgStream // insert & delete
k2sSyncStream ms.MsgStream
ctx context.Context
cancel context.CancelFunc
}
func NewTimeSyncMsgProducer(ctx context.Context) (*timeSyncMsgProducer, error) {
ctx2, cancel := context.WithCancel(ctx)
return &timeSyncMsgProducer{ctx: ctx2, cancel: cancel}, nil
}
func (syncMsgProducer *timeSyncMsgProducer) SetProxyTtBarrier(proxyTtBarrier TimeTickBarrier) {
syncMsgProducer.proxyTtBarrier = proxyTtBarrier
}
func (syncMsgProducer *timeSyncMsgProducer) SetWriteNodeTtBarrier(writeNodeTtBarrier TimeTickBarrier) {
syncMsgProducer.writeNodeTtBarrier = writeNodeTtBarrier
}
func (syncMsgProducer *timeSyncMsgProducer) SetDMSyncStream(dmSync ms.MsgStream) {
syncMsgProducer.dmSyncStream = dmSync
}
func (syncMsgProducer *timeSyncMsgProducer) SetK2sSyncStream(k2sSync ms.MsgStream) {
syncMsgProducer.k2sSyncStream = k2sSync
}
func (syncMsgProducer *timeSyncMsgProducer) broadcastMsg(barrier TimeTickBarrier, stream ms.MsgStream) error {
for {
select {
case <-syncMsgProducer.ctx.Done():
{
log.Printf("broadcast context done, exit")
return errors.Errorf("broadcast done exit")
}
default:
timetick, err := barrier.GetTimeTick()
if err != nil {
log.Printf("broadcast get time tick error")
}
msgPack := ms.MsgPack{}
baseMsg := ms.BaseMsg{
BeginTimestamp: timetick,
EndTimestamp: timetick,
HashValues: []int32{0},
}
timeTickResult := internalPb.TimeTickMsg{
MsgType: internalPb.MsgType_kTimeTick,
PeerId: 0,
Timestamp: timetick,
}
timeTickMsg := &ms.TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
var tsMsg ms.TsMsg
tsMsg = timeTickMsg
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
err = stream.Broadcast(&msgPack)
if err != nil {
return err
}
}
}
}
func (syncMsgProducer *timeSyncMsgProducer) Start() error {
err := syncMsgProducer.proxyTtBarrier.Start()
if err != nil {
return err
}
err = syncMsgProducer.writeNodeTtBarrier.Start()
if err != nil {
return err
}
go syncMsgProducer.broadcastMsg(syncMsgProducer.proxyTtBarrier, syncMsgProducer.dmSyncStream)
go syncMsgProducer.broadcastMsg(syncMsgProducer.writeNodeTtBarrier, syncMsgProducer.k2sSyncStream)
return nil
}
func (syncMsgProducer *timeSyncMsgProducer) Close() {
syncMsgProducer.proxyTtBarrier.Close()
syncMsgProducer.writeNodeTtBarrier.Close()
syncMsgProducer.dmSyncStream.Close()
syncMsgProducer.k2sSyncStream.Close()
syncMsgProducer.cancel()
}
......@@ -39,11 +39,6 @@ func (dsService *dataSyncService) start() {
dsService.fg.Start()
}
func (dsService *dataSyncService) close() {
dsService.fg.Close()
(*dsService.dmStream).Close()
}
func (dsService *dataSyncService) initNodes() {
// TODO: add delete pipeline support
......
......@@ -15,10 +15,10 @@ func (fdmNode *filterDmNode) Name() string {
}
func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
//fmt.Println("Do filterDmNode operation")
// fmt.Println("Do filterDmNode operation")
if len(in) != 1 {
log.Println("Invalid operate message input in filterDmNode, input length = ", len(in))
log.Println("Invalid operate message input in filterDmNode")
// TODO: add error handling
}
......
......@@ -29,7 +29,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
// fmt.Println("Do insertNode operation")
if len(in) != 1 {
log.Println("Invalid operate message input in insertNode, input length = ", len(in))
log.Println("Invalid operate message input in insertNode")
// TODO: add error handling
}
......
......@@ -17,7 +17,7 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
// fmt.Println("Do serviceTimeNode operation")
if len(in) != 1 {
log.Println("Invalid operate message input in serviceTimeNode, input length = ", len(in))
log.Println("Invalid operate message input in serviceTimeNode")
// TODO: add error handling
}
......
......@@ -61,7 +61,7 @@ func (ss *searchService) start() {
producerChannels := []string{"searchResult"}
searchResultStream := msgstream.NewPulsarMsgStream(ss.ctx, receiveBufSize)
searchResultStream := msgstream.NewPulsarMsgStream(context.Background(), receiveBufSize)
searchResultStream.SetPulsarCient(ss.pulsarURL)
searchResultStream.CreatePulsarProducers(producerChannels)
......
......@@ -66,10 +66,11 @@ func (fg *TimeTickedFlowGraph) Start() {
wg.Wait()
}
func (fg *TimeTickedFlowGraph) Close() {
func (fg *TimeTickedFlowGraph) Close() error {
for _, v := range fg.nodeCtx {
v.Close()
}
return nil
}
func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph {
......
......@@ -2,7 +2,6 @@ package flowgraph
import (
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"log"
)
type InputNode struct {
......@@ -25,16 +24,8 @@ func (inNode *InputNode) InStream() *msgstream.MsgStream {
// empty input and return one *Msg
func (inNode *InputNode) Operate(in []*Msg) []*Msg {
//fmt.Println("Do InputNode operation")
msgPack := (*inNode.inStream).Consume()
// TODO: add status
if msgPack == nil {
log.Println("null msg pack")
return nil
}
var msgStreamMsg Msg = &MsgStreamMsg{
tsMessages: msgPack.Msgs,
timestampMin: msgPack.BeginTs,
......
......@@ -2,7 +2,6 @@ package flowgraph
import (
"context"
"fmt"
"log"
"sync"
)
......@@ -33,19 +32,17 @@ type nodeCtx struct {
func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) {
if (*nodeCtx.node).IsInputNode() {
fmt.Println("start InputNode.inStream")
inStream, ok := (*nodeCtx.node).(*InputNode)
if !ok {
log.Fatal("Invalid inputNode")
}
(*inStream.inStream).Start()
go (*inStream.inStream).Start()
}
for {
select {
case <-ctx.Done():
wg.Done()
fmt.Println((*nodeCtx.node).Name(), "closed")
return
default:
// inputs from inputsMessages for Operate
......@@ -55,25 +52,21 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) {
nodeCtx.collectInputMessages()
inputs = nodeCtx.inputMessages
}
n := *nodeCtx.node
res := n.Operate(inputs)
wg := sync.WaitGroup{}
downstreamLength := len(nodeCtx.downstreamInputChanIdx)
if len(nodeCtx.downstream) < downstreamLength {
log.Println("nodeCtx.downstream length = ", len(nodeCtx.downstream))
log.Fatal("nodeCtx.downstream length = ", len(nodeCtx.downstream))
}
if len(res) < downstreamLength {
log.Println("node result length = ", len(res))
break
log.Fatal("node result length = ", len(res))
}
w := sync.WaitGroup{}
for i := 0; i < downstreamLength; i++ {
w.Add(1)
go nodeCtx.downstream[i].ReceiveMsg(&w, res[i], nodeCtx.downstreamInputChanIdx[(*nodeCtx.downstream[i].node).Name()])
wg.Add(1)
go nodeCtx.downstream[i].ReceiveMsg(&wg, res[i], nodeCtx.downstreamInputChanIdx[(*nodeCtx.downstream[i].node).Name()])
}
w.Wait()
wg.Wait()
}
}
}
......@@ -81,13 +74,12 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) {
func (nodeCtx *nodeCtx) Close() {
for _, channel := range nodeCtx.inputChannels {
close(channel)
fmt.Println("close inputChannel")
}
}
func (nodeCtx *nodeCtx) ReceiveMsg(wg *sync.WaitGroup, msg *Msg, inputChanIdx int) {
nodeCtx.inputChannels[inputChanIdx] <- msg
//fmt.Println((*nodeCtx.node).Name(), "receive to input channel ", inputChanIdx)
// fmt.Println((*nodeCtx.node).Name(), "receive to input channel ", inputChanIdx)
wg.Done()
}
......@@ -101,13 +93,8 @@ func (nodeCtx *nodeCtx) collectInputMessages() {
// and move them to inputMessages.
for i := 0; i < inputsNum; i++ {
channel := nodeCtx.inputChannels[i]
msg, ok := <-channel
if !ok {
// TODO: add status
log.Println("input channel closed")
return
}
nodeCtx.inputMessages[i] = msg
msg := <-channel
nodeCtx.inputMessages = append(nodeCtx.inputMessages, msg)
}
}
......
......@@ -37,7 +37,7 @@
cd milvus-distributed
pwd_dir=`pwd`
export PATH=$PATH:$(go env GOPATH)/bin
export protoc=${pwd_dir}/internal/core/cmake_build/thirdparty/protobuf/protobuf-build/protoc
export protoc=${pwd_dir}/cmake_build/thirdparty/protobuf/protobuf-build/protoc
./ci/scripts/proto_gen_go.sh
```
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册