From 4a11a6431bb44be9ec49656d582a5e362f213bb6 Mon Sep 17 00:00:00 2001 From: godchen Date: Thu, 12 Nov 2020 11:21:21 +0800 Subject: [PATCH] Add time sync producer Signed-off-by: godchen --- docs/developer_guides/developer_guides.md | 23 +- .../timesync/time_snyc_producer_test.go | 116 +++ .../master/timesync/time_sync_producer.go | 108 ++ internal/reader/data_sync_service.go | 5 - internal/reader/data_sync_service_test.go | 956 ++++++++++++++---- internal/reader/flow_graph_filter_dm_node.go | 4 +- internal/reader/flow_graph_insert_node.go | 2 +- .../reader/flow_graph_service_time_node.go | 2 +- internal/reader/search_service.go | 2 +- internal/util/flowgraph/flow_graph.go | 3 +- internal/util/flowgraph/input_node.go | 9 - internal/util/flowgraph/node.go | 33 +- scripts/README.md | 2 +- 13 files changed, 1034 insertions(+), 231 deletions(-) create mode 100644 internal/master/timesync/time_snyc_producer_test.go create mode 100644 internal/master/timesync/time_sync_producer.go diff --git a/docs/developer_guides/developer_guides.md b/docs/developer_guides/developer_guides.md index c2b21bbf0..2ae110e14 100644 --- a/docs/developer_guides/developer_guides.md +++ b/docs/developer_guides/developer_guides.md @@ -1216,25 +1216,28 @@ type TimeTickBarrier interface { } type timeSyncMsgProducer struct { - proxyTtBarrier TimeTickBarrier // softTimeTickBarrier - WriteNodeTtBarrier TimeTickBarrier //hardTimeTickBarrier + proxyTtBarrier TimeTickBarrier + //softTimeTickBarrier + writeNodeTtBarrier TimeTickBarrier + //hardTimeTickBarrier - dmSyncStream *MsgStream // insert & delete - k2sSyncStream *MsgStream + dmSyncStream MsgStream // insert & delete + k2sSyncStream MsgStream ctx context.Context + cancel context.CancelFunc } +func NewTimeSyncMsgProducer(ctx context.Context) (*timeSyncMsgProducer, error) -func (syncMsgProducer *timeSyncMsgProducer) SetProxyTtStreams(proxyTt *MsgStream, proxyIds []UniqueId) -func (syncMsgProducer *timeSyncMsgProducer) SetWriteNodeTtStreams(WriteNodeTt *MsgStream, writeNodeIds []UniqueId) +func (syncMsgProducer *timeSyncMsgProducer) SetProxyTtBarrier(proxyTtBarrier TimeTickBarrier) { +func (syncMsgProducer *timeSyncMsgProducer) SetWriteNodeTtBarrier(writeNodeTtBarrier TimeTickBarrier) { -func (syncMsgProducer *timeSyncMsgProducer) SetDmSyncStream(dmSyncStream *MsgStream) -func (syncMsgProducer *timeSyncMsgProducer) SetK2sSyncStream(k2sSyncStream *MsgStream) +func (syncMsgProducer *timeSyncMsgProducer) SetDmSyncStream(dmSyncStream MsgStream) +func (syncMsgProducer *timeSyncMsgProducer) SetK2sSyncStream(k2sSyncStream MsgStream) func (syncMsgProducer *timeSyncMsgProducer) Start() error -func (syncMsgProducer *timeSyncMsgProducer) Close() error +func (syncMsgProducer *timeSyncMsgProducer) Close() -func newTimeSyncMsgProducer(ctx context.Context) *timeSyncMsgProducer error ``` diff --git a/internal/master/timesync/time_snyc_producer_test.go b/internal/master/timesync/time_snyc_producer_test.go new file mode 100644 index 000000000..4c46137fe --- /dev/null +++ b/internal/master/timesync/time_snyc_producer_test.go @@ -0,0 +1,116 @@ +package timesync + +import ( + "context" + "github.com/stretchr/testify/assert" + ms "github.com/zilliztech/milvus-distributed/internal/msgstream" + "log" + "testing" + "time" +) + +type ( + TestTickBarrier struct { + value int64 + ctx context.Context + } +) + +func (ttBarrier *TestTickBarrier) GetTimeTick() (Timestamp, error) { + time.Sleep(1 * time.Second) + ttBarrier.value++ + return Timestamp(ttBarrier.value), nil +} + +func (ttBarrier *TestTickBarrier) Start() error { + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + { + log.Printf("barrier context done, exit") + return + } + } + } + }(ttBarrier.ctx) + return nil +} +func (ttBarrier *TestTickBarrier) Close() { + _, cancel := context.WithCancel(context.Background()) + cancel() +} + +func initTestPulsarStream(ctx context.Context, pulsarAddress string, + producerChannels []string, + consumerChannels []string, + consumerSubName string, opts ...ms.RepackFunc) (*ms.MsgStream, *ms.MsgStream) { + + // set input stream + inputStream := ms.NewPulsarMsgStream(ctx, 100) + inputStream.SetPulsarCient(pulsarAddress) + inputStream.CreatePulsarProducers(producerChannels) + for _, opt := range opts { + inputStream.SetRepackFunc(opt) + } + var input ms.MsgStream = inputStream + + // set output stream + outputStream := ms.NewPulsarMsgStream(ctx, 100) + outputStream.SetPulsarCient(pulsarAddress) + unmarshalDispatcher := ms.NewUnmarshalDispatcher() + outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100) + var output ms.MsgStream = outputStream + + return &input, &output +} +func receiveMsg(stream *ms.MsgStream) []uint64 { + receiveCount := 0 + var results []uint64 + for { + result := (*stream).Consume() + if len(result.Msgs) > 0 { + msgs := result.Msgs + for _, v := range msgs { + timetickmsg := (*v).(*ms.TimeTickMsg) + results = append(results, timetickmsg.TimeTickMsg.Timestamp) + receiveCount++ + if receiveCount == 10 { + return results + } + } + } + } + +} + +func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { + pulsarAddress := "pulsar://localhost:6650" + + producerChannels := []string{"proxyTtBarrier"} + consumerChannels := []string{"proxyTtBarrier"} + consumerSubName := "proxyTtBarrier" + ctx, _ := context.WithTimeout(context.Background(), 30*time.Second) + proxyTtInputStream, proxyTtOutputStream := initTestPulsarStream(ctx, pulsarAddress, producerChannels, consumerChannels, consumerSubName) + + producerChannels = []string{"writeNodeBarrier"} + consumerChannels = []string{"writeNodeBarrier"} + consumerSubName = "writeNodeBarrier" + writeNodeInputStream, writeNodeOutputStream := initTestPulsarStream(ctx, pulsarAddress, producerChannels, consumerChannels, consumerSubName) + + timeSyncProducer, _ := NewTimeSyncMsgProducer(ctx) + timeSyncProducer.SetProxyTtBarrier(&TestTickBarrier{ctx: ctx}) + timeSyncProducer.SetWriteNodeTtBarrier(&TestTickBarrier{ctx: ctx}) + timeSyncProducer.SetDMSyncStream(*proxyTtInputStream) + timeSyncProducer.SetK2sSyncStream(*writeNodeInputStream) + (*proxyTtOutputStream).Start() + (*writeNodeOutputStream).Start() + timeSyncProducer.Start() + expected := []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + result_1 := receiveMsg(proxyTtOutputStream) + assert.Equal(t, expected, result_1) + result_2 := receiveMsg(writeNodeOutputStream) + assert.Equal(t, expected, result_2) + + timeSyncProducer.Close() +} diff --git a/internal/master/timesync/time_sync_producer.go b/internal/master/timesync/time_sync_producer.go new file mode 100644 index 000000000..da9573480 --- /dev/null +++ b/internal/master/timesync/time_sync_producer.go @@ -0,0 +1,108 @@ +package timesync + +import ( + "github.com/zilliztech/milvus-distributed/internal/errors" + ms "github.com/zilliztech/milvus-distributed/internal/msgstream" + internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "log" + + "context" +) + +type timeSyncMsgProducer struct { + //softTimeTickBarrier + proxyTtBarrier TimeTickBarrier + //hardTimeTickBarrier + writeNodeTtBarrier TimeTickBarrier + + dmSyncStream ms.MsgStream // insert & delete + k2sSyncStream ms.MsgStream + + ctx context.Context + cancel context.CancelFunc +} + +func NewTimeSyncMsgProducer(ctx context.Context) (*timeSyncMsgProducer, error) { + ctx2, cancel := context.WithCancel(ctx) + return &timeSyncMsgProducer{ctx: ctx2, cancel: cancel}, nil +} + +func (syncMsgProducer *timeSyncMsgProducer) SetProxyTtBarrier(proxyTtBarrier TimeTickBarrier) { + syncMsgProducer.proxyTtBarrier = proxyTtBarrier +} + +func (syncMsgProducer *timeSyncMsgProducer) SetWriteNodeTtBarrier(writeNodeTtBarrier TimeTickBarrier) { + syncMsgProducer.writeNodeTtBarrier = writeNodeTtBarrier +} + +func (syncMsgProducer *timeSyncMsgProducer) SetDMSyncStream(dmSync ms.MsgStream) { + syncMsgProducer.dmSyncStream = dmSync +} + +func (syncMsgProducer *timeSyncMsgProducer) SetK2sSyncStream(k2sSync ms.MsgStream) { + syncMsgProducer.k2sSyncStream = k2sSync +} + +func (syncMsgProducer *timeSyncMsgProducer) broadcastMsg(barrier TimeTickBarrier, stream ms.MsgStream) error { + for { + select { + case <-syncMsgProducer.ctx.Done(): + { + log.Printf("broadcast context done, exit") + return errors.Errorf("broadcast done exit") + } + default: + timetick, err := barrier.GetTimeTick() + if err != nil { + log.Printf("broadcast get time tick error") + } + msgPack := ms.MsgPack{} + baseMsg := ms.BaseMsg{ + BeginTimestamp: timetick, + EndTimestamp: timetick, + HashValues: []int32{0}, + } + timeTickResult := internalPb.TimeTickMsg{ + MsgType: internalPb.MsgType_kTimeTick, + PeerId: 0, + Timestamp: timetick, + } + timeTickMsg := &ms.TimeTickMsg{ + BaseMsg: baseMsg, + TimeTickMsg: timeTickResult, + } + var tsMsg ms.TsMsg + tsMsg = timeTickMsg + msgPack.Msgs = append(msgPack.Msgs, &tsMsg) + err = stream.Broadcast(&msgPack) + if err != nil { + return err + } + } + } +} + +func (syncMsgProducer *timeSyncMsgProducer) Start() error { + err := syncMsgProducer.proxyTtBarrier.Start() + if err != nil { + return err + } + + err = syncMsgProducer.writeNodeTtBarrier.Start() + if err != nil { + return err + } + + go syncMsgProducer.broadcastMsg(syncMsgProducer.proxyTtBarrier, syncMsgProducer.dmSyncStream) + go syncMsgProducer.broadcastMsg(syncMsgProducer.writeNodeTtBarrier, syncMsgProducer.k2sSyncStream) + + return nil +} + +func (syncMsgProducer *timeSyncMsgProducer) Close() { + syncMsgProducer.proxyTtBarrier.Close() + syncMsgProducer.writeNodeTtBarrier.Close() + syncMsgProducer.dmSyncStream.Close() + syncMsgProducer.k2sSyncStream.Close() + syncMsgProducer.cancel() +} diff --git a/internal/reader/data_sync_service.go b/internal/reader/data_sync_service.go index 21be40633..2833d2822 100644 --- a/internal/reader/data_sync_service.go +++ b/internal/reader/data_sync_service.go @@ -39,11 +39,6 @@ func (dsService *dataSyncService) start() { dsService.fg.Start() } -func (dsService *dataSyncService) close() { - dsService.fg.Close() - (*dsService.dmStream).Close() -} - func (dsService *dataSyncService) initNodes() { // TODO: add delete pipeline support diff --git a/internal/reader/data_sync_service_test.go b/internal/reader/data_sync_service_test.go index c0d2e27a8..f5a0d93b2 100644 --- a/internal/reader/data_sync_service_test.go +++ b/internal/reader/data_sync_service_test.go @@ -1,180 +1,782 @@ package reader -import ( - "context" - "encoding/binary" - "github.com/golang/protobuf/proto" - "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - "math" - "testing" - "time" +//import ( +// "context" +// "encoding/binary" +// "math" +// "testing" +// "time" +// +// "github.com/zilliztech/milvus-distributed/internal/msgstream" +// "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" +// internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" +//) +// +//const ctxTimeInMillisecond = 500 +// +//func TestManipulationService_Start(t *testing.T) { +// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) +// ctx, cancel := context.WithDeadline(context.Background(), d) +// defer cancel() +// pulsarUrl := "pulsar://localhost:6650" +// +// node := NewQueryNode(ctx, 0, pulsarUrl) +// node.manipulationService = newDataSyncService(node.ctx, node, node.pulsarURL) +// +// segmentID := int64(0) +// +// var collection = node.newCollection(0, "collection0", "") +// var partition = collection.newPartition("partition0") +// var segment = partition.newSegment(segmentID) +// node.SegmentsMap[segmentID] = segment +// +// node.manipulationService.initNodes() +// go node.manipulationService.fg.Start() +// +// const msgLength = 10 +// const DIM = 16 +// const N = 3 +// +// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} +// var rawData []byte +// for _, ele := range vec { +// buf := make([]byte, 4) +// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) +// rawData = append(rawData, buf...) +// } +// bs := make([]byte, 4) +// binary.LittleEndian.PutUint32(bs, 1) +// rawData = append(rawData, bs...) +// var records []*commonpb.Blob +// for i := 0; i < N; i++ { +// blob := &commonpb.Blob{ +// Value: rawData, +// } +// records = append(records, blob) +// } +// +// timeRange := TimeRange{ +// timestampMin: 0, +// timestampMax: math.MaxUint64, +// } +// +// insertMessages := make([]*msgstream.TsMsg, 0) +// +// for i := 0; i < msgLength; i++ { +// var msg msgstream.TsMsg = &msgstream.InsertTask{ +// InsertRequest: internalPb.InsertRequest{ +// MsgType: internalPb.MsgType_kInsert, +// ReqId: int64(0), +// CollectionName: "collection0", +// PartitionTag: "default", +// SegmentId: int64(0), +// ChannelId: int64(0), +// ProxyId: int64(0), +// Timestamps: []uint64{uint64(i + 1000), uint64(i + 1000)}, +// RowIds: []int64{int64(i), int64(i)}, +// RowData: []*commonpb.Blob{ +// {Value: rawData}, +// {Value: rawData}, +// }, +// }, +// } +// insertMessages = append(insertMessages, &msg) +// } +// +// msgPack := msgstream.MsgPack{ +// BeginTs: timeRange.timestampMin, +// EndTs: timeRange.timestampMax, +// Msgs: insertMessages, +// } +// +// var msgStreamMsg Msg = &msgStreamMsg{ +// tsMessages: msgPack.Msgs, +// timeRange: TimeRange{ +// timestampMin: msgPack.BeginTs, +// timestampMax: msgPack.EndTs, +// }, +// } +// node.manipulationService.fg.Input(&msgStreamMsg) +// +// node.Close() +// +// for { +// select { +// case <-ctx.Done(): +// return +// } +// } +//} - "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" -) - -const ctxTimeInMillisecond = 2000 -const closeWithDeadline = true - -// NOTE: start pulsar before test -func TestManipulationService_Start(t *testing.T) { - var ctx context.Context - - if closeWithDeadline { - var cancel context.CancelFunc - d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, cancel = context.WithDeadline(context.Background(), d) - defer cancel() - } else { - ctx = context.Background() - } - - // init query node - pulsarUrl := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarUrl) - - // init meta - fieldVec := schemapb.FieldSchema{ - Name: "vec", - DataType: schemapb.DataType_VECTOR_FLOAT, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "dim", - Value: "16", - }, - }, - } - - fieldInt := schemapb.FieldSchema{ - Name: "age", - DataType: schemapb.DataType_INT32, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "dim", - Value: "1", - }, - }, - } - - schema := schemapb.CollectionSchema{ - Name: "collection0", - Fields: []*schemapb.FieldSchema{ - &fieldVec, &fieldInt, - }, - } - - collectionMeta := etcdpb.CollectionMeta{ - Id: UniqueID(0), - Schema: &schema, - CreateTime: Timestamp(0), - SegmentIds: []UniqueID{0}, - PartitionTags: []string{"default"}, - } - - collectionMetaBlob := proto.MarshalTextString(&collectionMeta) - assert.NotEqual(t, "", collectionMetaBlob) - - var collection = node.container.addCollection(&collectionMeta, collectionMetaBlob) - assert.Equal(t, collection.meta.Schema.Name, "collection0") - assert.Equal(t, collection.meta.Id, UniqueID(0)) - assert.Equal(t, len(node.container.collections), 1) - - partition, err := node.container.addPartition(collection, collectionMeta.PartitionTags[0]) - assert.NoError(t, err) - - segmentID := UniqueID(0) - targetSeg, err := node.container.addSegment(collection, partition, segmentID) - assert.NoError(t, err) - assert.Equal(t, targetSeg.segmentID, segmentID) - - // test data generate - const msgLength = 10 - const DIM = 16 - const N = 10 - - var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} - var rawData []byte - for _, ele := range vec { - buf := make([]byte, 4) - binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) - rawData = append(rawData, buf...) - } - bs := make([]byte, 4) - binary.LittleEndian.PutUint32(bs, 1) - rawData = append(rawData, bs...) - var records []*commonpb.Blob - for i := 0; i < N; i++ { - blob := &commonpb.Blob{ - Value: rawData, - } - records = append(records, blob) - } - - timeRange := TimeRange{ - timestampMin: 0, - timestampMax: math.MaxUint64, - } - - // messages generate - insertMessages := make([]*msgstream.TsMsg, 0) - for i := 0; i < msgLength; i++ { - var msg msgstream.TsMsg = &msgstream.InsertMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: []int32{ - int32(i), int32(i), - }, - }, - InsertRequest: internalPb.InsertRequest{ - MsgType: internalPb.MsgType_kInsert, - ReqId: int64(0), - CollectionName: "collection0", - PartitionTag: "default", - SegmentId: int64(0), - ChannelId: int64(0), - ProxyId: int64(0), - Timestamps: []uint64{uint64(i + 1000), uint64(i + 1000)}, - RowIds: []int64{int64(i), int64(i)}, - RowData: []*commonpb.Blob{ - {Value: rawData}, - {Value: rawData}, - }, - }, - } - insertMessages = append(insertMessages, &msg) - } - - msgPack := msgstream.MsgPack{ - BeginTs: timeRange.timestampMin, - EndTs: timeRange.timestampMax, - Msgs: insertMessages, - } - - // pulsar produce - const receiveBufSize = 1024 - producerChannels := []string{"insert"} - - insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) - insertStream.SetPulsarCient(pulsarUrl) - insertStream.CreatePulsarProducers(producerChannels) - - var insertMsgStream msgstream.MsgStream = insertStream - insertMsgStream.Start() - err = insertMsgStream.Produce(&msgPack) - assert.NoError(t, err) - - // dataSync - node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL) - go node.dataSyncService.start() - - node.Close() - - for { - select { - case <-ctx.Done(): - return - } - } -} +//import ( +// "context" +// "encoding/binary" +// "math" +// "strconv" +// "sync" +// "testing" +// "time" +// +// "github.com/stretchr/testify/assert" +// "github.com/zilliztech/milvus-distributed/internal/conf" +// "github.com/zilliztech/milvus-distributed/internal/msgclient" +// msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message" +//) +// +//func TestInsertAndDelete_MessagesPreprocess(t *testing.T) { +// ctx := context.Background() +// +// node := NewQueryNode(ctx, 0, 0) +// var collection = node.newCollection(0, "collection0", "") +// _ = collection.newPartition("partition0") +// +// const msgLength = 10 +// const DIM = 16 +// const N = 3 +// +// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} +// var rawData []byte +// for _, ele := range vec { +// buf := make([]byte, 4) +// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) +// rawData = append(rawData, buf...) +// } +// bs := make([]byte, 4) +// binary.LittleEndian.PutUint32(bs, 1) +// rawData = append(rawData, bs...) +// var records [][]byte +// for i := 0; i < N; i++ { +// records = append(records, rawData) +// } +// +// insertDeleteMessages := make([]*msgPb.InsertOrDeleteMsg, 0) +// +// for i := 0; i < msgLength; i++ { +// msg := msgPb.InsertOrDeleteMsg{ +// CollectionName: "collection0", +// RowsData: &msgPb.RowData{ +// Blob: rawData, +// }, +// Uid: int64(i), +// PartitionTag: "partition0", +// Timestamp: uint64(i + 1000), +// SegmentId: int64(i), +// ChannelId: 0, +// Op: msgPb.OpType_INSERT, +// ClientId: 0, +// ExtraParams: nil, +// } +// insertDeleteMessages = append(insertDeleteMessages, &msg) +// } +// +// timeRange := TimeRange{ +// timestampMin: 0, +// timestampMax: math.MaxUint64, +// } +// +// node.QueryNodeDataInit() +// +// assert.NotNil(t, node.deletePreprocessData) +// assert.NotNil(t, node.insertData) +// assert.NotNil(t, node.deleteData) +// +// node.MessagesPreprocess(insertDeleteMessages, timeRange) +// +// assert.Equal(t, len(node.insertData.insertIDs), msgLength) +// assert.Equal(t, len(node.insertData.insertTimestamps), msgLength) +// assert.Equal(t, len(node.insertData.insertRecords), msgLength) +// assert.Equal(t, len(node.insertData.insertOffset), 0) +// +// assert.Equal(t, len(node.buffer.InsertDeleteBuffer), 0) +// assert.Equal(t, len(node.buffer.validInsertDeleteBuffer), 0) +// +// assert.Equal(t, len(node.SegmentsMap), 10) +// assert.Equal(t, len(node.Collections[0].Partitions[0].segments), 10) +// +// node.Close() +//} +// +//// NOTE: start pulsar before test +//func TestInsertAndDelete_WriterDelete(t *testing.T) { +// conf.LoadConfig("config.yaml") +// +// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) +// ctx, cancel := context.WithDeadline(context.Background(), d) +// defer cancel() +// +// mc := msgclient.ReaderMessageClient{} +// pulsarAddr := "pulsar://" +// pulsarAddr += conf.Config.Pulsar.Address +// pulsarAddr += ":" +// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) +// mc.InitClient(ctx, pulsarAddr) +// +// mc.ReceiveMessage() +// node := CreateQueryNode(ctx, 0, 0, &mc) +// +// var collection = node.newCollection(0, "collection0", "") +// _ = collection.newPartition("partition0") +// +// const msgLength = 10 +// const DIM = 16 +// const N = 3 +// +// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} +// var rawData []byte +// for _, ele := range vec { +// buf := make([]byte, 4) +// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) +// rawData = append(rawData, buf...) +// } +// bs := make([]byte, 4) +// binary.LittleEndian.PutUint32(bs, 1) +// rawData = append(rawData, bs...) +// var records [][]byte +// for i := 0; i < N; i++ { +// records = append(records, rawData) +// } +// +// insertDeleteMessages := make([]*msgPb.InsertOrDeleteMsg, 0) +// +// for i := 0; i < msgLength; i++ { +// msg := msgPb.InsertOrDeleteMsg{ +// CollectionName: "collection0", +// RowsData: &msgPb.RowData{ +// Blob: rawData, +// }, +// Uid: int64(i), +// PartitionTag: "partition0", +// Timestamp: uint64(i + 1000), +// SegmentId: int64(i), +// ChannelId: 0, +// Op: msgPb.OpType_DELETE, +// ClientId: 0, +// ExtraParams: nil, +// } +// insertDeleteMessages = append(insertDeleteMessages, &msg) +// } +// +// timeRange := TimeRange{ +// timestampMin: 0, +// timestampMax: math.MaxUint64, +// } +// +// node.QueryNodeDataInit() +// +// assert.NotNil(t, node.deletePreprocessData) +// assert.NotNil(t, node.insertData) +// assert.NotNil(t, node.deleteData) +// +// node.MessagesPreprocess(insertDeleteMessages, timeRange) +// +// for i := 0; i < msgLength; i++ { +// key2SegMsg := msgPb.Key2SegMsg{ +// Uid: int64(i), +// Timestamp: uint64(i + 1000), +// SegmentId: []int64{int64(i)}, +// } +// node.messageClient.Key2SegChan <- &key2SegMsg +// } +// +// assert.Equal(t, len(node.deleteData.deleteIDs), 0) +// assert.Equal(t, len(node.deleteData.deleteTimestamps), 0) +// assert.Equal(t, len(node.deleteData.deleteOffset), 0) +// +// assert.Equal(t, len(node.buffer.InsertDeleteBuffer), 0) +// assert.Equal(t, len(node.buffer.validInsertDeleteBuffer), 0) +// +// assert.Equal(t, len(node.deletePreprocessData.deleteRecords), msgLength) +// assert.Equal(t, node.deletePreprocessData.count, int32(msgLength)) +// +// node.WriterDelete() +// +// assert.Equal(t, len(node.deletePreprocessData.deleteRecords), msgLength) +// assert.Equal(t, node.deletePreprocessData.count, int32(0)) +// +// assert.Equal(t, len(node.SegmentsMap), 10) +// assert.Equal(t, len(node.Collections[0].Partitions[0].segments), 10) +// +// node.Close() +//} +// +//// NOTE: start pulsar before test +//func TestInsertAndDelete_PreInsertAndDelete(t *testing.T) { +// conf.LoadConfig("config.yaml") +// +// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) +// ctx, cancel := context.WithDeadline(context.Background(), d) +// defer cancel() +// +// mc := msgclient.ReaderMessageClient{} +// pulsarAddr := "pulsar://" +// pulsarAddr += conf.Config.Pulsar.Address +// pulsarAddr += ":" +// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) +// mc.InitClient(ctx, pulsarAddr) +// +// mc.ReceiveMessage() +// node := CreateQueryNode(ctx, 0, 0, &mc) +// +// var collection = node.newCollection(0, "collection0", "") +// _ = collection.newPartition("partition0") +// +// const msgLength = 10 +// const DIM = 16 +// const N = 3 +// +// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} +// var rawData []byte +// for _, ele := range vec { +// buf := make([]byte, 4) +// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) +// rawData = append(rawData, buf...) +// } +// bs := make([]byte, 4) +// binary.LittleEndian.PutUint32(bs, 1) +// rawData = append(rawData, bs...) +// var records [][]byte +// for i := 0; i < N; i++ { +// records = append(records, rawData) +// } +// +// insertDeleteMessages := make([]*msgPb.InsertOrDeleteMsg, 0) +// +// for i := 0; i < msgLength/2; i++ { +// msg := msgPb.InsertOrDeleteMsg{ +// CollectionName: "collection0", +// RowsData: &msgPb.RowData{ +// Blob: rawData, +// }, +// Uid: int64(i), +// PartitionTag: "partition0", +// Timestamp: uint64(i + 1000), +// SegmentId: int64(i), +// ChannelId: 0, +// Op: msgPb.OpType_INSERT, +// ClientId: 0, +// ExtraParams: nil, +// } +// insertDeleteMessages = append(insertDeleteMessages, &msg) +// } +// +// for i := 0; i < msgLength/2; i++ { +// msg := msgPb.InsertOrDeleteMsg{ +// CollectionName: "collection0", +// RowsData: &msgPb.RowData{ +// Blob: rawData, +// }, +// Uid: int64(i), +// PartitionTag: "partition0", +// Timestamp: uint64(i + 1000), +// SegmentId: int64(i + msgLength/2), +// ChannelId: 0, +// Op: msgPb.OpType_DELETE, +// ClientId: 0, +// ExtraParams: nil, +// } +// insertDeleteMessages = append(insertDeleteMessages, &msg) +// } +// +// timeRange := TimeRange{ +// timestampMin: 0, +// timestampMax: math.MaxUint64, +// } +// +// node.QueryNodeDataInit() +// +// assert.NotNil(t, node.deletePreprocessData) +// assert.NotNil(t, node.insertData) +// assert.NotNil(t, node.deleteData) +// +// node.MessagesPreprocess(insertDeleteMessages, timeRange) +// +// for i := 0; i < msgLength; i++ { +// key2SegMsg := msgPb.Key2SegMsg{ +// Uid: int64(i), +// Timestamp: uint64(i + 1000), +// SegmentId: []int64{int64(i)}, +// } +// node.messageClient.Key2SegChan <- &key2SegMsg +// } +// +// assert.Equal(t, len(node.insertData.insertIDs), msgLength/2) +// assert.Equal(t, len(node.insertData.insertTimestamps), msgLength/2) +// assert.Equal(t, len(node.insertData.insertRecords), msgLength/2) +// assert.Equal(t, len(node.insertData.insertOffset), 0) +// +// assert.Equal(t, len(node.deleteData.deleteIDs), 0) +// assert.Equal(t, len(node.deleteData.deleteTimestamps), 0) +// assert.Equal(t, len(node.deleteData.deleteOffset), 0) +// +// assert.Equal(t, len(node.buffer.InsertDeleteBuffer), 0) +// assert.Equal(t, len(node.buffer.validInsertDeleteBuffer), 0) +// +// assert.Equal(t, len(node.deletePreprocessData.deleteRecords), msgLength/2) +// assert.Equal(t, node.deletePreprocessData.count, int32(msgLength/2)) +// +// assert.Equal(t, len(node.SegmentsMap), 10) +// assert.Equal(t, len(node.Collections[0].Partitions[0].segments), 10) +// +// node.WriterDelete() +// +// assert.Equal(t, len(node.deletePreprocessData.deleteRecords), msgLength/2) +// assert.Equal(t, node.deletePreprocessData.count, int32(0)) +// +// node.PreInsertAndDelete() +// +// assert.Equal(t, len(node.insertData.insertOffset), msgLength/2) +// +// assert.Equal(t, len(node.deleteData.deleteIDs), msgLength/2) +// assert.Equal(t, len(node.deleteData.deleteTimestamps), msgLength/2) +// assert.Equal(t, len(node.deleteData.deleteOffset), msgLength/2) +// +// node.Close() +//} +// +//func TestInsertAndDelete_DoInsert(t *testing.T) { +// conf.LoadConfig("config.yaml") +// +// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) +// ctx, cancel := context.WithDeadline(context.Background(), d) +// defer cancel() +// +// mc := msgclient.ReaderMessageClient{} +// node := CreateQueryNode(ctx, 0, 0, &mc) +// +// var collection = node.newCollection(0, "collection0", "") +// _ = collection.newPartition("partition0") +// +// const msgLength = 10 +// const DIM = 16 +// const N = 3 +// +// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} +// var rawData []byte +// for _, ele := range vec { +// buf := make([]byte, 4) +// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) +// rawData = append(rawData, buf...) +// } +// bs := make([]byte, 4) +// binary.LittleEndian.PutUint32(bs, 1) +// rawData = append(rawData, bs...) +// var records [][]byte +// for i := 0; i < N; i++ { +// records = append(records, rawData) +// } +// +// insertDeleteMessages := make([]*msgPb.InsertOrDeleteMsg, 0) +// +// for i := 0; i < msgLength; i++ { +// msg := msgPb.InsertOrDeleteMsg{ +// CollectionName: "collection0", +// RowsData: &msgPb.RowData{ +// Blob: rawData, +// }, +// Uid: int64(i), +// PartitionTag: "partition0", +// Timestamp: uint64(i + 1000), +// SegmentId: int64(i), +// ChannelId: 0, +// Op: msgPb.OpType_INSERT, +// ClientId: 0, +// ExtraParams: nil, +// } +// insertDeleteMessages = append(insertDeleteMessages, &msg) +// } +// +// timeRange := TimeRange{ +// timestampMin: 0, +// timestampMax: math.MaxUint64, +// } +// +// node.QueryNodeDataInit() +// +// assert.NotNil(t, node.deletePreprocessData) +// assert.NotNil(t, node.insertData) +// assert.NotNil(t, node.deleteData) +// +// node.MessagesPreprocess(insertDeleteMessages, timeRange) +// +// assert.Equal(t, len(node.insertData.insertIDs), msgLength) +// assert.Equal(t, len(node.insertData.insertTimestamps), msgLength) +// assert.Equal(t, len(node.insertData.insertRecords), msgLength) +// assert.Equal(t, len(node.insertData.insertOffset), 0) +// +// assert.Equal(t, len(node.buffer.InsertDeleteBuffer), 0) +// assert.Equal(t, len(node.buffer.validInsertDeleteBuffer), 0) +// +// assert.Equal(t, len(node.SegmentsMap), 10) +// assert.Equal(t, len(node.Collections[0].Partitions[0].segments), 10) +// +// node.PreInsertAndDelete() +// +// assert.Equal(t, len(node.insertData.insertOffset), msgLength) +// +// wg := sync.WaitGroup{} +// for segmentID := range node.insertData.insertRecords { +// wg.Add(1) +// go node.DoInsert(segmentID, &wg) +// } +// wg.Wait() +// +// node.Close() +//} +// +//// NOTE: start pulsar before test +//func TestInsertAndDelete_DoDelete(t *testing.T) { +// conf.LoadConfig("config.yaml") +// +// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) +// ctx, cancel := context.WithDeadline(context.Background(), d) +// defer cancel() +// +// mc := msgclient.ReaderMessageClient{} +// pulsarAddr := "pulsar://" +// pulsarAddr += conf.Config.Pulsar.Address +// pulsarAddr += ":" +// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) +// mc.InitClient(ctx, pulsarAddr) +// +// mc.ReceiveMessage() +// node := CreateQueryNode(ctx, 0, 0, &mc) +// +// var collection = node.newCollection(0, "collection0", "") +// _ = collection.newPartition("partition0") +// +// const msgLength = 10 +// const DIM = 16 +// const N = 3 +// +// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} +// var rawData []byte +// for _, ele := range vec { +// buf := make([]byte, 4) +// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) +// rawData = append(rawData, buf...) +// } +// bs := make([]byte, 4) +// binary.LittleEndian.PutUint32(bs, 1) +// rawData = append(rawData, bs...) +// var records [][]byte +// for i := 0; i < N; i++ { +// records = append(records, rawData) +// } +// +// insertDeleteMessages := make([]*msgPb.InsertOrDeleteMsg, 0) +// +// for i := 0; i < msgLength; i++ { +// msg := msgPb.InsertOrDeleteMsg{ +// CollectionName: "collection0", +// RowsData: &msgPb.RowData{ +// Blob: rawData, +// }, +// Uid: int64(i), +// PartitionTag: "partition0", +// Timestamp: uint64(i + 1000), +// SegmentId: int64(i), +// ChannelId: 0, +// Op: msgPb.OpType_DELETE, +// ClientId: 0, +// ExtraParams: nil, +// } +// insertDeleteMessages = append(insertDeleteMessages, &msg) +// } +// +// timeRange := TimeRange{ +// timestampMin: 0, +// timestampMax: math.MaxUint64, +// } +// +// node.QueryNodeDataInit() +// +// assert.NotNil(t, node.deletePreprocessData) +// assert.NotNil(t, node.insertData) +// assert.NotNil(t, node.deleteData) +// +// node.MessagesPreprocess(insertDeleteMessages, timeRange) +// +// for i := 0; i < msgLength; i++ { +// key2SegMsg := msgPb.Key2SegMsg{ +// Uid: int64(i), +// Timestamp: uint64(i + 1000), +// SegmentId: []int64{int64(i)}, +// } +// node.messageClient.Key2SegChan <- &key2SegMsg +// } +// +// assert.Equal(t, len(node.deleteData.deleteIDs), 0) +// assert.Equal(t, len(node.deleteData.deleteTimestamps), 0) +// assert.Equal(t, len(node.deleteData.deleteOffset), 0) +// +// assert.Equal(t, len(node.buffer.InsertDeleteBuffer), 0) +// assert.Equal(t, len(node.buffer.validInsertDeleteBuffer), 0) +// +// assert.Equal(t, len(node.deletePreprocessData.deleteRecords), msgLength) +// assert.Equal(t, node.deletePreprocessData.count, int32(msgLength)) +// +// assert.Equal(t, len(node.SegmentsMap), 10) +// assert.Equal(t, len(node.Collections[0].Partitions[0].segments), 10) +// +// node.WriterDelete() +// +// assert.Equal(t, len(node.deletePreprocessData.deleteRecords), msgLength) +// assert.Equal(t, node.deletePreprocessData.count, int32(0)) +// +// node.PreInsertAndDelete() +// +// assert.Equal(t, len(node.deleteData.deleteIDs), msgLength) +// assert.Equal(t, len(node.deleteData.deleteTimestamps), msgLength) +// assert.Equal(t, len(node.deleteData.deleteOffset), msgLength) +// +// wg := sync.WaitGroup{} +// for segmentID, deleteIDs := range node.deleteData.deleteIDs { +// if segmentID < 0 { +// continue +// } +// wg.Add(1) +// var deleteTimestamps = node.deleteData.deleteTimestamps[segmentID] +// go node.DoDelete(segmentID, &deleteIDs, &deleteTimestamps, &wg) +// } +// wg.Wait() +// +// node.Close() +//} +// +//// NOTE: start pulsar before test +//func TestInsertAndDelete_DoInsertAndDelete(t *testing.T) { +// conf.LoadConfig("config.yaml") +// +// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) +// ctx, cancel := context.WithDeadline(context.Background(), d) +// defer cancel() +// +// mc := msgclient.ReaderMessageClient{} +// pulsarAddr := "pulsar://" +// pulsarAddr += conf.Config.Pulsar.Address +// pulsarAddr += ":" +// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) +// mc.InitClient(ctx, pulsarAddr) +// +// mc.ReceiveMessage() +// node := CreateQueryNode(ctx, 0, 0, &mc) +// +// var collection = node.newCollection(0, "collection0", "") +// _ = collection.newPartition("partition0") +// +// const msgLength = 10 +// const DIM = 16 +// const N = 3 +// +// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} +// var rawData []byte +// for _, ele := range vec { +// buf := make([]byte, 4) +// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) +// rawData = append(rawData, buf...) +// } +// bs := make([]byte, 4) +// binary.LittleEndian.PutUint32(bs, 1) +// rawData = append(rawData, bs...) +// var records [][]byte +// for i := 0; i < N; i++ { +// records = append(records, rawData) +// } +// +// insertDeleteMessages := make([]*msgPb.InsertOrDeleteMsg, 0) +// +// for i := 0; i < msgLength/2; i++ { +// msg := msgPb.InsertOrDeleteMsg{ +// CollectionName: "collection0", +// RowsData: &msgPb.RowData{ +// Blob: rawData, +// }, +// Uid: int64(i), +// PartitionTag: "partition0", +// Timestamp: uint64(i + 1000), +// SegmentId: int64(i), +// ChannelId: 0, +// Op: msgPb.OpType_INSERT, +// ClientId: 0, +// ExtraParams: nil, +// } +// insertDeleteMessages = append(insertDeleteMessages, &msg) +// } +// +// for i := 0; i < msgLength/2; i++ { +// msg := msgPb.InsertOrDeleteMsg{ +// CollectionName: "collection0", +// RowsData: &msgPb.RowData{ +// Blob: rawData, +// }, +// Uid: int64(i), +// PartitionTag: "partition0", +// Timestamp: uint64(i + 1000), +// SegmentId: int64(i + msgLength/2), +// ChannelId: 0, +// Op: msgPb.OpType_DELETE, +// ClientId: 0, +// ExtraParams: nil, +// } +// insertDeleteMessages = append(insertDeleteMessages, &msg) +// } +// +// timeRange := TimeRange{ +// timestampMin: 0, +// timestampMax: math.MaxUint64, +// } +// +// node.QueryNodeDataInit() +// +// assert.NotNil(t, node.deletePreprocessData) +// assert.NotNil(t, node.insertData) +// assert.NotNil(t, node.deleteData) +// +// node.MessagesPreprocess(insertDeleteMessages, timeRange) +// +// for i := 0; i < msgLength; i++ { +// key2SegMsg := msgPb.Key2SegMsg{ +// Uid: int64(i), +// Timestamp: uint64(i + 1000), +// SegmentId: []int64{int64(i)}, +// } +// node.messageClient.Key2SegChan <- &key2SegMsg +// } +// +// assert.Equal(t, len(node.insertData.insertIDs), msgLength/2) +// assert.Equal(t, len(node.insertData.insertTimestamps), msgLength/2) +// assert.Equal(t, len(node.insertData.insertRecords), msgLength/2) +// assert.Equal(t, len(node.insertData.insertOffset), 0) +// +// assert.Equal(t, len(node.deleteData.deleteIDs), 0) +// assert.Equal(t, len(node.deleteData.deleteTimestamps), 0) +// assert.Equal(t, len(node.deleteData.deleteOffset), 0) +// +// assert.Equal(t, len(node.buffer.InsertDeleteBuffer), 0) +// assert.Equal(t, len(node.buffer.validInsertDeleteBuffer), 0) +// +// assert.Equal(t, len(node.deletePreprocessData.deleteRecords), msgLength/2) +// assert.Equal(t, node.deletePreprocessData.count, int32(msgLength/2)) +// +// assert.Equal(t, len(node.SegmentsMap), 10) +// assert.Equal(t, len(node.Collections[0].Partitions[0].segments), 10) +// +// node.WriterDelete() +// +// assert.Equal(t, len(node.deletePreprocessData.deleteRecords), msgLength/2) +// assert.Equal(t, node.deletePreprocessData.count, int32(0)) +// +// node.PreInsertAndDelete() +// +// assert.Equal(t, len(node.insertData.insertOffset), msgLength/2) +// +// assert.Equal(t, len(node.deleteData.deleteIDs), msgLength/2) +// assert.Equal(t, len(node.deleteData.deleteTimestamps), msgLength/2) +// assert.Equal(t, len(node.deleteData.deleteOffset), msgLength/2) +// +// status := node.DoInsertAndDelete() +// +// assert.Equal(t, status.ErrorCode, msgPb.ErrorCode_SUCCESS) +// +// node.Close() +//} diff --git a/internal/reader/flow_graph_filter_dm_node.go b/internal/reader/flow_graph_filter_dm_node.go index 91e13db42..aea8c5416 100644 --- a/internal/reader/flow_graph_filter_dm_node.go +++ b/internal/reader/flow_graph_filter_dm_node.go @@ -15,10 +15,10 @@ func (fdmNode *filterDmNode) Name() string { } func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { - //fmt.Println("Do filterDmNode operation") + // fmt.Println("Do filterDmNode operation") if len(in) != 1 { - log.Println("Invalid operate message input in filterDmNode, input length = ", len(in)) + log.Println("Invalid operate message input in filterDmNode") // TODO: add error handling } diff --git a/internal/reader/flow_graph_insert_node.go b/internal/reader/flow_graph_insert_node.go index 889084dc1..3a2b3f317 100644 --- a/internal/reader/flow_graph_insert_node.go +++ b/internal/reader/flow_graph_insert_node.go @@ -29,7 +29,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { // fmt.Println("Do insertNode operation") if len(in) != 1 { - log.Println("Invalid operate message input in insertNode, input length = ", len(in)) + log.Println("Invalid operate message input in insertNode") // TODO: add error handling } diff --git a/internal/reader/flow_graph_service_time_node.go b/internal/reader/flow_graph_service_time_node.go index 6e9dfb325..27c9efbe9 100644 --- a/internal/reader/flow_graph_service_time_node.go +++ b/internal/reader/flow_graph_service_time_node.go @@ -17,7 +17,7 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg { // fmt.Println("Do serviceTimeNode operation") if len(in) != 1 { - log.Println("Invalid operate message input in serviceTimeNode, input length = ", len(in)) + log.Println("Invalid operate message input in serviceTimeNode") // TODO: add error handling } diff --git a/internal/reader/search_service.go b/internal/reader/search_service.go index bf7f27f4c..dd692c6da 100644 --- a/internal/reader/search_service.go +++ b/internal/reader/search_service.go @@ -61,7 +61,7 @@ func (ss *searchService) start() { producerChannels := []string{"searchResult"} - searchResultStream := msgstream.NewPulsarMsgStream(ss.ctx, receiveBufSize) + searchResultStream := msgstream.NewPulsarMsgStream(context.Background(), receiveBufSize) searchResultStream.SetPulsarCient(ss.pulsarURL) searchResultStream.CreatePulsarProducers(producerChannels) diff --git a/internal/util/flowgraph/flow_graph.go b/internal/util/flowgraph/flow_graph.go index 822842c66..c4658a91a 100644 --- a/internal/util/flowgraph/flow_graph.go +++ b/internal/util/flowgraph/flow_graph.go @@ -66,10 +66,11 @@ func (fg *TimeTickedFlowGraph) Start() { wg.Wait() } -func (fg *TimeTickedFlowGraph) Close() { +func (fg *TimeTickedFlowGraph) Close() error { for _, v := range fg.nodeCtx { v.Close() } + return nil } func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph { diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index 82e802c73..bb4e2d98b 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -2,7 +2,6 @@ package flowgraph import ( "github.com/zilliztech/milvus-distributed/internal/msgstream" - "log" ) type InputNode struct { @@ -25,16 +24,8 @@ func (inNode *InputNode) InStream() *msgstream.MsgStream { // empty input and return one *Msg func (inNode *InputNode) Operate(in []*Msg) []*Msg { - //fmt.Println("Do InputNode operation") - msgPack := (*inNode.inStream).Consume() - // TODO: add status - if msgPack == nil { - log.Println("null msg pack") - return nil - } - var msgStreamMsg Msg = &MsgStreamMsg{ tsMessages: msgPack.Msgs, timestampMin: msgPack.BeginTs, diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index 9e01496d3..cfe5d712c 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -2,7 +2,6 @@ package flowgraph import ( "context" - "fmt" "log" "sync" ) @@ -33,19 +32,17 @@ type nodeCtx struct { func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) { if (*nodeCtx.node).IsInputNode() { - fmt.Println("start InputNode.inStream") inStream, ok := (*nodeCtx.node).(*InputNode) if !ok { log.Fatal("Invalid inputNode") } - (*inStream.inStream).Start() + go (*inStream.inStream).Start() } for { select { case <-ctx.Done(): wg.Done() - fmt.Println((*nodeCtx.node).Name(), "closed") return default: // inputs from inputsMessages for Operate @@ -55,25 +52,21 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) { nodeCtx.collectInputMessages() inputs = nodeCtx.inputMessages } - n := *nodeCtx.node res := n.Operate(inputs) - + wg := sync.WaitGroup{} downstreamLength := len(nodeCtx.downstreamInputChanIdx) if len(nodeCtx.downstream) < downstreamLength { - log.Println("nodeCtx.downstream length = ", len(nodeCtx.downstream)) + log.Fatal("nodeCtx.downstream length = ", len(nodeCtx.downstream)) } if len(res) < downstreamLength { - log.Println("node result length = ", len(res)) - break + log.Fatal("node result length = ", len(res)) } - - w := sync.WaitGroup{} for i := 0; i < downstreamLength; i++ { - w.Add(1) - go nodeCtx.downstream[i].ReceiveMsg(&w, res[i], nodeCtx.downstreamInputChanIdx[(*nodeCtx.downstream[i].node).Name()]) + wg.Add(1) + go nodeCtx.downstream[i].ReceiveMsg(&wg, res[i], nodeCtx.downstreamInputChanIdx[(*nodeCtx.downstream[i].node).Name()]) } - w.Wait() + wg.Wait() } } } @@ -81,13 +74,12 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) { func (nodeCtx *nodeCtx) Close() { for _, channel := range nodeCtx.inputChannels { close(channel) - fmt.Println("close inputChannel") } } func (nodeCtx *nodeCtx) ReceiveMsg(wg *sync.WaitGroup, msg *Msg, inputChanIdx int) { nodeCtx.inputChannels[inputChanIdx] <- msg - //fmt.Println((*nodeCtx.node).Name(), "receive to input channel ", inputChanIdx) + // fmt.Println((*nodeCtx.node).Name(), "receive to input channel ", inputChanIdx) wg.Done() } @@ -101,13 +93,8 @@ func (nodeCtx *nodeCtx) collectInputMessages() { // and move them to inputMessages. for i := 0; i < inputsNum; i++ { channel := nodeCtx.inputChannels[i] - msg, ok := <-channel - if !ok { - // TODO: add status - log.Println("input channel closed") - return - } - nodeCtx.inputMessages[i] = msg + msg := <-channel + nodeCtx.inputMessages = append(nodeCtx.inputMessages, msg) } } diff --git a/scripts/README.md b/scripts/README.md index 5407d6bb3..cdd565511 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -37,7 +37,7 @@ cd milvus-distributed pwd_dir=`pwd` export PATH=$PATH:$(go env GOPATH)/bin - export protoc=${pwd_dir}/internal/core/cmake_build/thirdparty/protobuf/protobuf-build/protoc + export protoc=${pwd_dir}/cmake_build/thirdparty/protobuf/protobuf-build/protoc ./ci/scripts/proto_gen_go.sh ``` -- GitLab