flow_graph_insert_node.go 3.6 KB
Newer Older
1
package querynode
2 3

import (
4
	"context"
5
	"sync"
C
cai.zhang 已提交
6

B
bigsheeper 已提交
7 8 9
	"go.uber.org/zap"

	"github.com/zilliztech/milvus-distributed/internal/log"
C
cai.zhang 已提交
10
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
11 12 13
)

type insertNode struct {
14
	baseNode
15
	replica ReplicaInterface
16 17 18
}

type InsertData struct {
19
	insertContext    map[int64]context.Context
20 21 22 23 24 25 26 27 28 29
	insertIDs        map[UniqueID][]UniqueID
	insertTimestamps map[UniqueID][]Timestamp
	insertRecords    map[UniqueID][]*commonpb.Blob
	insertOffset     map[UniqueID]int64
}

func (iNode *insertNode) Name() string {
	return "iNode"
}

30
func (iNode *insertNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Context) {
B
bigsheeper 已提交
31
	//log.Debug("Do insertNode operation")
32 33

	if len(in) != 1 {
B
bigsheeper 已提交
34
		log.Error("Invalid operate message input in insertNode", zap.Int("input length", len(in)))
35 36 37
		// TODO: add error handling
	}

38
	iMsg, ok := in[0].(*insertMsg)
39
	if !ok {
B
bigsheeper 已提交
40
		log.Error("type assertion failed for insertMsg")
41 42 43 44 45 46 47 48 49 50 51 52
		// TODO: add error handling
	}

	insertData := InsertData{
		insertIDs:        make(map[int64][]int64),
		insertTimestamps: make(map[int64][]uint64),
		insertRecords:    make(map[int64][]*commonpb.Blob),
		insertOffset:     make(map[int64]int64),
	}

	// 1. hash insertMessages to insertData
	for _, task := range iMsg.insertMessages {
53 54 55
		insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...)
		insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...)
		insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...)
D
dragondriver 已提交
56 57

		// check if segment exists, if not, create this segment
X
XuanYang-cn 已提交
58
		if !iNode.replica.hasSegment(task.SegmentID) {
59
			err := iNode.replica.addSegment(task.SegmentID, task.PartitionID, task.CollectionID, segTypeGrowing)
D
dragondriver 已提交
60
			if err != nil {
B
bigsheeper 已提交
61
				log.Error(err.Error())
D
dragondriver 已提交
62 63 64
				continue
			}
		}
65 66 67 68
	}

	// 2. do preInsert
	for segmentID := range insertData.insertRecords {
X
XuanYang-cn 已提交
69
		var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
70
		if err != nil {
B
bigsheeper 已提交
71
			log.Error("preInsert failed")
72 73 74 75 76 77 78 79 80 81 82 83 84 85
			// TODO: add error handling
		}

		var numOfRecords = len(insertData.insertRecords[segmentID])
		if targetSegment != nil {
			var offset = targetSegment.segmentPreInsert(numOfRecords)
			insertData.insertOffset[segmentID] = offset
		}
	}

	// 3. do insert
	wg := sync.WaitGroup{}
	for segmentID := range insertData.insertRecords {
		wg.Add(1)
G
godchen 已提交
86
		go iNode.insert(&insertData, segmentID, &wg)
87 88 89 90
	}
	wg.Wait()

	var res Msg = &serviceTimeMsg{
B
bigsheeper 已提交
91
		gcRecord:  iMsg.gcRecord,
92 93
		timeRange: iMsg.timeRange,
	}
94
	return []Msg{res}, ctx
95 96
}

G
godchen 已提交
97
func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
X
XuanYang-cn 已提交
98
	var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
99
	if err != nil {
B
bigsheeper 已提交
100
		log.Error("cannot find segment:", zap.Int64("segmentID", segmentID))
101
		// TODO: add error handling
N
neza2017 已提交
102
		wg.Done()
103 104 105 106 107 108 109 110 111 112
		return
	}

	ids := insertData.insertIDs[segmentID]
	timestamps := insertData.insertTimestamps[segmentID]
	records := insertData.insertRecords[segmentID]
	offsets := insertData.insertOffset[segmentID]

	err = targetSegment.segmentInsert(offsets, &ids, &timestamps, &records)
	if err != nil {
B
bigsheeper 已提交
113
		log.Error(err.Error())
114
		// TODO: add error handling
N
neza2017 已提交
115
		wg.Done()
116 117 118
		return
	}

B
bigsheeper 已提交
119
	log.Debug("Do insert done", zap.Int("len", len(insertData.insertIDs[segmentID])))
120 121 122
	wg.Done()
}

123
func newInsertNode(replica ReplicaInterface) *insertNode {
124 125
	maxQueueLength := Params.FlowGraphMaxQueueLength
	maxParallelism := Params.FlowGraphMaxParallelism
F
FluorineDog 已提交
126

127
	baseNode := baseNode{}
128 129 130 131
	baseNode.SetMaxQueueLength(maxQueueLength)
	baseNode.SetMaxParallelism(maxParallelism)

	return &insertNode{
132
		baseNode: baseNode,
D
dragondriver 已提交
133
		replica:  replica,
134 135
	}
}