flow_graph_insert_node.go 4.9 KB
Newer Older
N
neza2017 已提交
1
package querynode
2 3

import (
G
godchen 已提交
4
	"context"
5 6 7
	"fmt"
	"log"
	"sync"
C
cai.zhang 已提交
8

G
godchen 已提交
9 10
	"github.com/opentracing/opentracing-go"
	oplog "github.com/opentracing/opentracing-go/log"
C
cai.zhang 已提交
11
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
G
godchen 已提交
12
	internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
13 14 15 16
)

type insertNode struct {
	BaseNode
X
XuanYang-cn 已提交
17
	replica collectionReplica
18 19 20
}

type InsertData struct {
G
godchen 已提交
21
	insertContext    map[int64]context.Context
22 23 24 25 26 27 28 29 30 31 32 33 34 35
	insertIDs        map[UniqueID][]UniqueID
	insertTimestamps map[UniqueID][]Timestamp
	insertRecords    map[UniqueID][]*commonpb.Blob
	insertOffset     map[UniqueID]int64
}

func (iNode *insertNode) Name() string {
	return "iNode"
}

func (iNode *insertNode) Operate(in []*Msg) []*Msg {
	// fmt.Println("Do insertNode operation")

	if len(in) != 1 {
C
cai.zhang 已提交
36
		log.Println("Invalid operate message input in insertNode, input length = ", len(in))
37 38 39 40 41 42 43 44 45
		// TODO: add error handling
	}

	iMsg, ok := (*in[0]).(*insertMsg)
	if !ok {
		log.Println("type assertion failed for insertMsg")
		// TODO: add error handling
	}

G
godchen 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
	var childs []opentracing.Span
	tracer := opentracing.GlobalTracer()
	if tracer != nil && iMsg != nil {
		for _, msg := range iMsg.insertMessages {
			if msg.Type() == internalPb.MsgType_kInsert || msg.Type() == internalPb.MsgType_kSearch {
				var child opentracing.Span
				ctx := msg.GetMsgContext()
				if parent := opentracing.SpanFromContext(ctx); parent != nil {
					child = tracer.StartSpan("pass filter node",
						opentracing.FollowsFrom(parent.Context()))
				} else {
					child = tracer.StartSpan("pass filter node")
				}
				child.SetTag("hash keys", msg.HashKeys())
				child.SetTag("start time", msg.BeginTs())
				child.SetTag("end time", msg.EndTs())
				msg.SetMsgContext(opentracing.ContextWithSpan(ctx, child))
				childs = append(childs, child)
			}
		}
	}

68
	insertData := InsertData{
G
godchen 已提交
69
		insertContext:    make(map[int64]context.Context),
70 71 72 73 74 75 76 77
		insertIDs:        make(map[int64][]int64),
		insertTimestamps: make(map[int64][]uint64),
		insertRecords:    make(map[int64][]*commonpb.Blob),
		insertOffset:     make(map[int64]int64),
	}

	// 1. hash insertMessages to insertData
	for _, task := range iMsg.insertMessages {
G
godchen 已提交
78
		insertData.insertContext[task.SegmentID] = task.GetMsgContext()
79 80 81
		insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...)
		insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...)
		insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...)
D
dragondriver 已提交
82 83

		// check if segment exists, if not, create this segment
X
XuanYang-cn 已提交
84 85
		if !iNode.replica.hasSegment(task.SegmentID) {
			collection, err := iNode.replica.getCollectionByName(task.CollectionName)
D
dragondriver 已提交
86 87 88 89
			if err != nil {
				log.Println(err)
				continue
			}
X
XuanYang-cn 已提交
90
			err = iNode.replica.addSegment(task.SegmentID, task.PartitionTag, collection.ID())
D
dragondriver 已提交
91 92 93 94 95
			if err != nil {
				log.Println(err)
				continue
			}
		}
96 97 98 99
	}

	// 2. do preInsert
	for segmentID := range insertData.insertRecords {
X
XuanYang-cn 已提交
100
		var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
		if err != nil {
			log.Println("preInsert failed")
			// TODO: add error handling
		}

		var numOfRecords = len(insertData.insertRecords[segmentID])
		if targetSegment != nil {
			var offset = targetSegment.segmentPreInsert(numOfRecords)
			insertData.insertOffset[segmentID] = offset
		}
	}

	// 3. do insert
	wg := sync.WaitGroup{}
	for segmentID := range insertData.insertRecords {
		wg.Add(1)
G
godchen 已提交
117
		go iNode.insert(insertData.insertContext[segmentID], &insertData, segmentID, &wg)
118 119 120 121
	}
	wg.Wait()

	var res Msg = &serviceTimeMsg{
B
bigsheeper 已提交
122
		gcRecord:  iMsg.gcRecord,
123 124
		timeRange: iMsg.timeRange,
	}
G
godchen 已提交
125 126 127
	for _, child := range childs {
		child.Finish()
	}
128 129 130
	return []*Msg{&res}
}

G
godchen 已提交
131 132 133
func (iNode *insertNode) insert(ctx context.Context, insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
	span, _ := opentracing.StartSpanFromContext(ctx, "insert node insert")
	defer span.Finish()
X
XuanYang-cn 已提交
134
	var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
135 136 137
	if err != nil {
		log.Println("cannot find segment:", segmentID)
		// TODO: add error handling
N
neza2017 已提交
138
		wg.Done()
G
godchen 已提交
139
		span.LogFields(oplog.Error(err))
140 141 142 143 144 145 146 147 148 149
		return
	}

	ids := insertData.insertIDs[segmentID]
	timestamps := insertData.insertTimestamps[segmentID]
	records := insertData.insertRecords[segmentID]
	offsets := insertData.insertOffset[segmentID]

	err = targetSegment.segmentInsert(offsets, &ids, &timestamps, &records)
	if err != nil {
N
neza2017 已提交
150
		log.Println(err)
151
		// TODO: add error handling
N
neza2017 已提交
152
		wg.Done()
G
godchen 已提交
153
		span.LogFields(oplog.Error(err))
154 155 156 157 158 159 160
		return
	}

	fmt.Println("Do insert done, len = ", len(insertData.insertIDs[segmentID]))
	wg.Done()
}

X
XuanYang-cn 已提交
161
func newInsertNode(replica collectionReplica) *insertNode {
162 163
	maxQueueLength := Params.FlowGraphMaxQueueLength
	maxParallelism := Params.FlowGraphMaxParallelism
F
FluorineDog 已提交
164

165 166 167 168 169
	baseNode := BaseNode{}
	baseNode.SetMaxQueueLength(maxQueueLength)
	baseNode.SetMaxParallelism(maxParallelism)

	return &insertNode{
D
dragondriver 已提交
170 171
		BaseNode: baseNode,
		replica:  replica,
172 173
	}
}