diff --git a/Makefile b/Makefile index 48f90b3fbfa858053f5357abc6723f3e0746fcf3..1ae87e6930a5061ce356517b24aa7e61d7032e58 100644 --- a/Makefile +++ b/Makefile @@ -127,7 +127,6 @@ install: all @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/master $(GOPATH)/bin/master @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/proxy $(GOPATH)/bin/proxy @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/writenode $(GOPATH)/bin/writenode - @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/indexbuilder $(GOPATH)/bin/indexbuilder @mkdir -p $(LIBRARY_PATH) && cp -f $(PWD)/internal/core/output/lib/* $(LIBRARY_PATH) @echo "Installation successful." @@ -135,10 +134,7 @@ clean: @echo "Cleaning up all the generated files" @find . -name '*.test' | xargs rm -fv @find . -name '*~' | xargs rm -fv - @rm -rf bin/ - @rm -rf lib/ - @rm -rf $(GOPATH)/bin/master - @rm -rf $(GOPATH)/bin/proxy - @rm -rf $(GOPATH)/bin/querynode - @rm -rf $(GOPATH)/bin/writenode - @rm -rf $(GOPATH)/bin/indexbuilder + @rm -rvf querynode + @rm -rvf master + @rm -rvf proxy + @rm -rvf writenode diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index 44a595caf5679969e4455b592e70cdb1f9f34012..5ed2caa0675b53c4aa9a895a6983cd3f98dc0057 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -48,6 +48,7 @@ func (dsService *dataSyncService) initNodes() { var insertNode Node = newInsertNode(dsService.replica) var serviceTimeNode Node = newServiceTimeNode(dsService.replica) + var gcNode Node = newGCNode(dsService.replica) dsService.fg.AddNode(&dmStreamNode) dsService.fg.AddNode(&ddStreamNode) @@ -57,6 +58,7 @@ func (dsService *dataSyncService) initNodes() { dsService.fg.AddNode(&insertNode) dsService.fg.AddNode(&serviceTimeNode) + dsService.fg.AddNode(&gcNode) // dmStreamNode var err = dsService.fg.SetEdges(dmStreamNode.Name(), @@ -106,9 +108,17 @@ func (dsService *dataSyncService) initNodes() { // serviceTimeNode err = dsService.fg.SetEdges(serviceTimeNode.Name(), []string{insertNode.Name()}, - []string{}, + []string{gcNode.Name()}, ) if err != nil { log.Fatal("set edges failed in node:", serviceTimeNode.Name()) } + + // gcNode + err = dsService.fg.SetEdges(gcNode.Name(), + []string{serviceTimeNode.Name()}, + []string{}) + if err != nil { + log.Fatal("set edges failed in node:", gcNode.Name()) + } } diff --git a/internal/querynode/flow_graph_dd_node.go b/internal/querynode/flow_graph_dd_node.go index f4a5e1136946450de3a25f206f64a2b6ef7e0ec1..a7a2ac73201ff48439090ad0ef5f58d9c1b38e1d 100644 --- a/internal/querynode/flow_graph_dd_node.go +++ b/internal/querynode/flow_graph_dd_node.go @@ -44,6 +44,11 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { }, } ddNode.ddMsg = &ddMsg + gcRecord := gcRecord{ + collections: make([]UniqueID, 0), + partitions: make([]partitionWithID, 0), + } + ddNode.ddMsg.gcRecord = &gcRecord // sort tsMessages tsMessages := msMsg.TsMessages() @@ -115,11 +120,11 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) { collectionID := msg.CollectionID - err := ddNode.replica.removeCollection(collectionID) - if err != nil { - log.Println(err) - return - } + //err := ddNode.replica.removeCollection(collectionID) + //if err != nil { + // log.Println(err) + // return + //} collectionName := msg.CollectionName.CollectionName ddNode.ddMsg.collectionRecords[collectionName] = append(ddNode.ddMsg.collectionRecords[collectionName], @@ -127,6 +132,8 @@ func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) { createOrDrop: false, timestamp: msg.Timestamp, }) + + ddNode.ddMsg.gcRecord.collections = append(ddNode.ddMsg.gcRecord.collections, collectionID) } func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { @@ -150,17 +157,22 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { collectionID := msg.CollectionID partitionTag := msg.PartitionName.Tag - err := ddNode.replica.removePartition(collectionID, partitionTag) - if err != nil { - log.Println(err) - return - } + //err := ddNode.replica.removePartition(collectionID, partitionTag) + //if err != nil { + // log.Println(err) + // return + //} ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag], metaOperateRecord{ createOrDrop: false, timestamp: msg.Timestamp, }) + + ddNode.ddMsg.gcRecord.partitions = append(ddNode.ddMsg.gcRecord.partitions, partitionWithID{ + partitionTag: partitionTag, + collectionID: collectionID, + }) } func newDDNode(replica collectionReplica) *ddNode { diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index ceddaeab0b95a1a1b1880def7689ee3278a20e3a..fbc8eedb5c82b00e868561b6e5971a9af3f78468 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -2,6 +2,7 @@ package querynode import ( "log" + "math" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -59,6 +60,7 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { } } + iMsg.gcRecord = ddMsg.gcRecord var res Msg = &iMsg return []*Msg{&res} } @@ -81,17 +83,35 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg log.Println("Error, misaligned messages detected") return nil } + tmpTimestamps := make([]Timestamp, 0) tmpRowIDs := make([]int64, 0) tmpRowData := make([]*commonpb.Blob, 0) - targetTimestamp := records[len(records)-1].timestamp + + // calculate valid time range + timeBegin := Timestamp(0) + timeEnd := Timestamp(math.MaxUint64) + for _, record := range records { + if record.createOrDrop && timeBegin < record.timestamp { + timeBegin = record.timestamp + } + if !record.createOrDrop && timeEnd > record.timestamp { + timeEnd = record.timestamp + } + } + for i, t := range msg.Timestamps { - if t >= targetTimestamp { + if t >= timeBegin && t <= timeEnd { tmpTimestamps = append(tmpTimestamps, t) tmpRowIDs = append(tmpRowIDs, msg.RowIDs[i]) tmpRowData = append(tmpRowData, msg.RowData[i]) } } + + if len(tmpRowIDs) <= 0 { + return nil + } + msg.Timestamps = tmpTimestamps msg.RowIDs = tmpRowIDs msg.RowData = tmpRowData diff --git a/internal/querynode/flow_graph_gc_node.go b/internal/querynode/flow_graph_gc_node.go new file mode 100644 index 0000000000000000000000000000000000000000..cd0a9b984e7cf991436f3a3195935133e45c4c9a --- /dev/null +++ b/internal/querynode/flow_graph_gc_node.go @@ -0,0 +1,61 @@ +package querynode + +import ( + "log" +) + +type gcNode struct { + BaseNode + replica collectionReplica +} + +func (gcNode *gcNode) Name() string { + return "gcNode" +} + +func (gcNode *gcNode) Operate(in []*Msg) []*Msg { + //fmt.Println("Do gcNode operation") + + if len(in) != 1 { + log.Println("Invalid operate message input in gcNode, input length = ", len(in)) + // TODO: add error handling + } + + gcMsg, ok := (*in[0]).(*gcMsg) + if !ok { + log.Println("type assertion failed for gcMsg") + // TODO: add error handling + } + + // drop collections + for _, collectionID := range gcMsg.gcRecord.collections { + err := gcNode.replica.removeCollection(collectionID) + if err != nil { + log.Println(err) + } + } + + // drop partitions + for _, partition := range gcMsg.gcRecord.partitions { + err := gcNode.replica.removePartition(partition.collectionID, partition.partitionTag) + if err != nil { + log.Println(err) + } + } + + return nil +} + +func newGCNode(replica collectionReplica) *gcNode { + maxQueueLength := Params.FlowGraphMaxQueueLength + maxParallelism := Params.FlowGraphMaxParallelism + + baseNode := BaseNode{} + baseNode.SetMaxQueueLength(maxQueueLength) + baseNode.SetMaxParallelism(maxParallelism) + + return &gcNode{ + BaseNode: baseNode, + replica: replica, + } +} diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index f60369521967aea4714a6099959fdcbadecb5883..9a2c8ca1f11e34738dfbfae93eaeb8e715b70ef3 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -90,6 +90,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { wg.Wait() var res Msg = &serviceTimeMsg{ + gcRecord: iMsg.gcRecord, timeRange: iMsg.timeRange, } return []*Msg{&res} diff --git a/internal/querynode/flow_graph_message.go b/internal/querynode/flow_graph_message.go index 88a133fab34a84d21da9f635103c1d8f7466f5d0..451f9b6952ad003a3f687ee44550808cce6bc481 100644 --- a/internal/querynode/flow_graph_message.go +++ b/internal/querynode/flow_graph_message.go @@ -16,6 +16,7 @@ type key2SegMsg struct { type ddMsg struct { collectionRecords map[string][]metaOperateRecord partitionRecords map[string][]metaOperateRecord + gcRecord *gcRecord timeRange TimeRange } @@ -26,6 +27,7 @@ type metaOperateRecord struct { type insertMsg struct { insertMessages []*msgstream.InsertMsg + gcRecord *gcRecord timeRange TimeRange } @@ -35,6 +37,12 @@ type deleteMsg struct { } type serviceTimeMsg struct { + gcRecord *gcRecord + timeRange TimeRange +} + +type gcMsg struct { + gcRecord *gcRecord timeRange TimeRange } @@ -55,42 +63,39 @@ type DeletePreprocessData struct { count int32 } -func (ksMsg *key2SegMsg) TimeTick() Timestamp { - return ksMsg.timeRange.timestampMax +// TODO: replace partitionWithID by partition id +type partitionWithID struct { + partitionTag string + collectionID UniqueID } -func (ksMsg *key2SegMsg) DownStreamNodeIdx() int { - return 0 +type gcRecord struct { + // collections and partitions to be dropped + collections []UniqueID + // TODO: use partition id + partitions []partitionWithID } -func (suMsg *ddMsg) TimeTick() Timestamp { - return suMsg.timeRange.timestampMax +func (ksMsg *key2SegMsg) TimeTick() Timestamp { + return ksMsg.timeRange.timestampMax } -func (suMsg *ddMsg) DownStreamNodeIdx() int { - return 0 +func (suMsg *ddMsg) TimeTick() Timestamp { + return suMsg.timeRange.timestampMax } func (iMsg *insertMsg) TimeTick() Timestamp { return iMsg.timeRange.timestampMax } -func (iMsg *insertMsg) DownStreamNodeIdx() int { - return 0 -} - func (dMsg *deleteMsg) TimeTick() Timestamp { return dMsg.timeRange.timestampMax } -func (dMsg *deleteMsg) DownStreamNodeIdx() int { - return 0 -} - func (stMsg *serviceTimeMsg) TimeTick() Timestamp { return stMsg.timeRange.timestampMax } -func (stMsg *serviceTimeMsg) DownStreamNodeIdx() int { - return 0 +func (gcMsg *gcMsg) TimeTick() Timestamp { + return gcMsg.timeRange.timestampMax } diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index 38feb029c8af67745dbedca59af89ee25b9b9023..275666560d589c95f57b1f13ddefa90b5eb76ee6 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -30,7 +30,12 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg { // update service time stNode.replica.getTSafe().set(serviceTimeMsg.timeRange.timestampMax) //fmt.Println("update tSafe to:", getPhysicalTime(serviceTimeMsg.timeRange.timestampMax)) - return nil + + var res Msg = &gcMsg{ + gcRecord: serviceTimeMsg.gcRecord, + timeRange: serviceTimeMsg.timeRange, + } + return []*Msg{&res} } func newServiceTimeNode(replica collectionReplica) *serviceTimeNode { diff --git a/internal/util/flowgraph/message.go b/internal/util/flowgraph/message.go index e5c01d7d4ef92872f38abc45713cd68f06f5a6ee..f02d2604cbee807f791a9877cf969e3047fdfb61 100644 --- a/internal/util/flowgraph/message.go +++ b/internal/util/flowgraph/message.go @@ -4,7 +4,6 @@ import "github.com/zilliztech/milvus-distributed/internal/msgstream" type Msg interface { TimeTick() Timestamp - DownStreamNodeIdx() int } type MsgStreamMsg struct { diff --git a/internal/writenode/flow_graph_message.go b/internal/writenode/flow_graph_message.go index 147822e7c5035b8574817dc5911c845727a92078..5825ef570e97425cbdda95ef44cda20576be9740 100644 --- a/internal/writenode/flow_graph_message.go +++ b/internal/writenode/flow_graph_message.go @@ -46,30 +46,14 @@ func (ksMsg *key2SegMsg) TimeTick() Timestamp { return ksMsg.timeRange.timestampMax } -func (ksMsg *key2SegMsg) DownStreamNodeIdx() int { - return 0 -} - func (suMsg *ddMsg) TimeTick() Timestamp { return suMsg.timeRange.timestampMax } -func (suMsg *ddMsg) DownStreamNodeIdx() int { - return 0 -} - func (iMsg *insertMsg) TimeTick() Timestamp { return iMsg.timeRange.timestampMax } -func (iMsg *insertMsg) DownStreamNodeIdx() int { - return 0 -} - func (dMsg *deleteMsg) TimeTick() Timestamp { return dMsg.timeRange.timestampMax } - -func (dMsg *deleteMsg) DownStreamNodeIdx() int { - return 0 -}