flow_graph_insert_node.go 3.5 KB
Newer Older
N
neza2017 已提交
1
package querynode
2 3 4 5 6

import (
	"fmt"
	"log"
	"sync"
C
cai.zhang 已提交
7 8

	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
9 10 11 12
)

type insertNode struct {
	BaseNode
X
XuanYang-cn 已提交
13
	replica collectionReplica
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
}

type InsertData struct {
	insertIDs        map[UniqueID][]UniqueID
	insertTimestamps map[UniqueID][]Timestamp
	insertRecords    map[UniqueID][]*commonpb.Blob
	insertOffset     map[UniqueID]int64
}

func (iNode *insertNode) Name() string {
	return "iNode"
}

func (iNode *insertNode) Operate(in []*Msg) []*Msg {
	// fmt.Println("Do insertNode operation")

	if len(in) != 1 {
C
cai.zhang 已提交
31
		log.Println("Invalid operate message input in insertNode, input length = ", len(in))
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
		// TODO: add error handling
	}

	iMsg, ok := (*in[0]).(*insertMsg)
	if !ok {
		log.Println("type assertion failed for insertMsg")
		// TODO: add error handling
	}

	insertData := InsertData{
		insertIDs:        make(map[int64][]int64),
		insertTimestamps: make(map[int64][]uint64),
		insertRecords:    make(map[int64][]*commonpb.Blob),
		insertOffset:     make(map[int64]int64),
	}

	// 1. hash insertMessages to insertData
	for _, task := range iMsg.insertMessages {
50 51 52
		insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...)
		insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...)
		insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...)
D
dragondriver 已提交
53 54

		// check if segment exists, if not, create this segment
X
XuanYang-cn 已提交
55 56
		if !iNode.replica.hasSegment(task.SegmentID) {
			collection, err := iNode.replica.getCollectionByName(task.CollectionName)
D
dragondriver 已提交
57 58 59 60
			if err != nil {
				log.Println(err)
				continue
			}
X
XuanYang-cn 已提交
61
			err = iNode.replica.addSegment(task.SegmentID, task.PartitionTag, collection.ID())
D
dragondriver 已提交
62 63 64 65 66
			if err != nil {
				log.Println(err)
				continue
			}
		}
67 68 69 70
	}

	// 2. do preInsert
	for segmentID := range insertData.insertRecords {
X
XuanYang-cn 已提交
71
		var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
		if err != nil {
			log.Println("preInsert failed")
			// TODO: add error handling
		}

		var numOfRecords = len(insertData.insertRecords[segmentID])
		if targetSegment != nil {
			var offset = targetSegment.segmentPreInsert(numOfRecords)
			insertData.insertOffset[segmentID] = offset
		}
	}

	// 3. do insert
	wg := sync.WaitGroup{}
	for segmentID := range insertData.insertRecords {
		wg.Add(1)
		go iNode.insert(&insertData, segmentID, &wg)
	}
	wg.Wait()

	var res Msg = &serviceTimeMsg{
93
		gcRecord:  iMsg.gcRecord,
94 95 96 97 98 99
		timeRange: iMsg.timeRange,
	}
	return []*Msg{&res}
}

func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
X
XuanYang-cn 已提交
100
	var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
101 102 103
	if err != nil {
		log.Println("cannot find segment:", segmentID)
		// TODO: add error handling
N
neza2017 已提交
104
		wg.Done()
105 106 107 108 109 110 111 112 113 114
		return
	}

	ids := insertData.insertIDs[segmentID]
	timestamps := insertData.insertTimestamps[segmentID]
	records := insertData.insertRecords[segmentID]
	offsets := insertData.insertOffset[segmentID]

	err = targetSegment.segmentInsert(offsets, &ids, &timestamps, &records)
	if err != nil {
N
neza2017 已提交
115
		log.Println(err)
116
		// TODO: add error handling
N
neza2017 已提交
117
		wg.Done()
118 119 120 121 122 123 124
		return
	}

	fmt.Println("Do insert done, len = ", len(insertData.insertIDs[segmentID]))
	wg.Done()
}

X
XuanYang-cn 已提交
125
func newInsertNode(replica collectionReplica) *insertNode {
126 127
	maxQueueLength := Params.FlowGraphMaxQueueLength
	maxParallelism := Params.FlowGraphMaxParallelism
F
FluorineDog 已提交
128

129 130 131 132 133
	baseNode := BaseNode{}
	baseNode.SetMaxQueueLength(maxQueueLength)
	baseNode.SetMaxParallelism(maxParallelism)

	return &insertNode{
D
dragondriver 已提交
134 135
		BaseNode: baseNode,
		replica:  replica,
136 137
	}
}