flow_graph_insert_buffer_node.go 25.4 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
XuanYang-cn 已提交
12 13 14 15 16 17
package datanode

import (
	"bytes"
	"context"
	"encoding/binary"
X
XuanYang-cn 已提交
18
	"fmt"
X
XuanYang-cn 已提交
19 20
	"path"
	"strconv"
21
	"sync"
X
XuanYang-cn 已提交
22 23
	"unsafe"

X
XuanYang-cn 已提交
24
	"go.uber.org/zap"
X
XuanYang-cn 已提交
25

X
Xiangyu Wang 已提交
26 27 28 29 30 31 32
	"github.com/milvus-io/milvus/internal/kv"
	miniokv "github.com/milvus-io/milvus/internal/kv/minio"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/storage"
	"github.com/milvus-io/milvus/internal/util/flowgraph"
	"github.com/milvus-io/milvus/internal/util/trace"
33
	"github.com/opentracing/opentracing-go"
X
Xiangyu Wang 已提交
34 35

	"github.com/milvus-io/milvus/internal/proto/commonpb"
Y
yangxuan 已提交
36
	"github.com/milvus-io/milvus/internal/proto/datapb"
X
Xiangyu Wang 已提交
37 38 39
	"github.com/milvus-io/milvus/internal/proto/etcdpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
X
XuanYang-cn 已提交
40 41 42 43 44 45 46 47 48 49 50
)

const (
	CollectionPrefix = "/collection/"
	SegmentPrefix    = "/segment/"
)

type (
	InsertData = storage.InsertData
	Blob       = storage.Blob
)
51 52
type insertBufferNode struct {
	BaseNode
S
sunby 已提交
53
	channelName  string
54 55
	insertBuffer *insertBuffer
	replica      Replica
56
	idAllocator  allocatorInterface
57
	flushMap     sync.Map
58
	flushChan    <-chan *flushMsg
59

G
godchen 已提交
60
	minIOKV kv.BaseKV
61 62 63

	timeTickStream          msgstream.MsgStream
	segmentStatisticsStream msgstream.MsgStream
N
neza2017 已提交
64

N
neza2017 已提交
65 66 67
	dsSaveBinlog          func(fu *segmentFlushUnit) error
	segmentCheckPoints    map[UniqueID]segmentCheckPoint
	segmentCheckPointLock sync.Mutex
N
neza2017 已提交
68 69
}

N
neza2017 已提交
70 71 72 73 74 75
type segmentCheckPoint struct {
	numRows int64
	pos     internalpb.MsgPosition
}

type segmentFlushUnit struct {
76 77 78 79 80 81
	collID         UniqueID
	segID          UniqueID
	field2Path     map[UniqueID]string
	checkPoint     map[UniqueID]segmentCheckPoint
	startPositions []*datapb.SegmentStartPosition
	flushed        bool
82 83 84 85
}

type insertBuffer struct {
	insertData map[UniqueID]*InsertData // SegmentID to InsertData
G
godchen 已提交
86
	maxSize    int64
87
}
X
XuanYang-cn 已提交
88

G
godchen 已提交
89
func (ib *insertBuffer) size(segmentID UniqueID) int64 {
X
XuanYang-cn 已提交
90 91 92 93 94 95 96 97
	if ib.insertData == nil || len(ib.insertData) <= 0 {
		return 0
	}
	idata, ok := ib.insertData[segmentID]
	if !ok {
		return 0
	}

G
godchen 已提交
98
	var maxSize int64 = 0
X
XuanYang-cn 已提交
99 100
	for _, data := range idata.Data {
		fdata, ok := data.(*storage.FloatVectorFieldData)
G
godchen 已提交
101 102 103 104 105 106 107 108 109 110
		if ok {
			totalNumRows := int64(0)
			if fdata.NumRows != nil {
				for _, numRow := range fdata.NumRows {
					totalNumRows += numRow
				}
			}
			if totalNumRows > maxSize {
				maxSize = totalNumRows
			}
X
XuanYang-cn 已提交
111 112 113
		}

		bdata, ok := data.(*storage.BinaryVectorFieldData)
G
godchen 已提交
114 115 116 117 118 119 120 121 122 123
		if ok {
			totalNumRows := int64(0)
			if bdata.NumRows != nil {
				for _, numRow := range bdata.NumRows {
					totalNumRows += numRow
				}
			}
			if totalNumRows > maxSize {
				maxSize = totalNumRows
			}
X
XuanYang-cn 已提交
124 125 126 127 128 129 130
		}

	}
	return maxSize
}

func (ib *insertBuffer) full(segmentID UniqueID) bool {
G
godchen 已提交
131
	log.Debug("Segment size", zap.Any("segment", segmentID), zap.Int64("size", ib.size(segmentID)), zap.Int64("maxsize", ib.maxSize))
X
XuanYang-cn 已提交
132 133 134 135 136 137 138
	return ib.size(segmentID) >= ib.maxSize
}

func (ibNode *insertBufferNode) Name() string {
	return "ibNode"
}

139
func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
X
XuanYang-cn 已提交
140

S
sunby 已提交
141 142
	// log.Debug("InsertBufferNode Operating")

X
XuanYang-cn 已提交
143
	if len(in) != 1 {
X
XuanYang-cn 已提交
144
		log.Error("Invalid operate message input in insertBufferNode", zap.Int("input length", len(in)))
X
XuanYang-cn 已提交
145 146 147
		// TODO: add error handling
	}

148
	iMsg, ok := in[0].(*insertMsg)
X
XuanYang-cn 已提交
149
	if !ok {
X
XuanYang-cn 已提交
150
		log.Error("type assertion failed for insertMsg")
X
XuanYang-cn 已提交
151 152 153
		// TODO: add error handling
	}

B
bigsheeper 已提交
154
	if iMsg == nil {
155 156
		ibNode.timeTickStream.Close()
		ibNode.segmentStatisticsStream.Close()
157 158 159 160 161 162 163 164
		return []Msg{}
	}

	var spans []opentracing.Span
	for _, msg := range iMsg.insertMessages {
		sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
		spans = append(spans, sp)
		msg.SetTraceCtx(ctx)
B
bigsheeper 已提交
165 166
	}

167 168 169 170 171 172 173 174
	// replace pchannel with vchannel
	for _, pos := range iMsg.startPositions {
		pos.ChannelName = ibNode.channelName
	}
	for _, pos := range iMsg.endPositions {
		pos.ChannelName = ibNode.channelName
	}

X
XuanYang-cn 已提交
175
	// Updating segment statistics
X
XuanYang-cn 已提交
176
	uniqueSeg := make(map[UniqueID]int64)
177
	for _, msg := range iMsg.insertMessages {
178

179
		currentSegID := msg.GetSegmentID()
X
XuanYang-cn 已提交
180 181 182
		collID := msg.GetCollectionID()
		partitionID := msg.GetPartitionID()

183
		if !ibNode.replica.hasSegment(currentSegID, true) {
X
XuanYang-cn 已提交
184 185
			err := ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(),
				iMsg.startPositions[0], iMsg.endPositions[0])
X
XuanYang-cn 已提交
186
			if err != nil {
X
XuanYang-cn 已提交
187
				log.Error("add segment wrong", zap.Error(err))
X
XuanYang-cn 已提交
188
			}
X
XuanYang-cn 已提交
189

X
XuanYang-cn 已提交
190 191
		}

X
XuanYang-cn 已提交
192 193
		segNum := uniqueSeg[currentSegID]
		uniqueSeg[currentSegID] = segNum + int64(len(msg.RowIDs))
194
	}
X
XuanYang-cn 已提交
195

196
	segToUpdate := make([]UniqueID, 0, len(uniqueSeg))
X
XuanYang-cn 已提交
197
	for id, num := range uniqueSeg {
198
		segToUpdate = append(segToUpdate, id)
X
XuanYang-cn 已提交
199 200 201 202 203

		err := ibNode.replica.updateStatistics(id, num)
		if err != nil {
			log.Error("update Segment Row number wrong", zap.Error(err))
		}
204
	}
X
XuanYang-cn 已提交
205

206 207
	if len(segToUpdate) > 0 {
		err := ibNode.updateSegStatistics(segToUpdate)
X
XuanYang-cn 已提交
208 209
		if err != nil {
			log.Error("update segment statistics error", zap.Error(err))
X
XuanYang-cn 已提交
210
		}
211 212
	}

X
XuanYang-cn 已提交
213 214 215 216
	// iMsg is insertMsg
	// 1. iMsg -> buffer
	for _, msg := range iMsg.insertMessages {
		if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
X
XuanYang-cn 已提交
217
			log.Error("misaligned messages detected")
X
XuanYang-cn 已提交
218 219 220
			continue
		}
		currentSegID := msg.GetSegmentID()
X
XuanYang-cn 已提交
221
		collectionID := msg.GetCollectionID()
X
XuanYang-cn 已提交
222 223 224 225 226 227 228 229

		idata, ok := ibNode.insertBuffer.insertData[currentSegID]
		if !ok {
			idata = &InsertData{
				Data: make(map[UniqueID]storage.FieldData),
			}
		}

X
XuanYang-cn 已提交
230 231
		// 1.1 Get Collection Schema
		collSchema, err := ibNode.replica.getCollectionSchema(collectionID, msg.EndTs())
X
XuanYang-cn 已提交
232 233
		if err != nil {
			// GOOSE TODO add error handler
S
sunby 已提交
234
			log.Error("Get schema wrong:", zap.Error(err))
X
XuanYang-cn 已提交
235 236 237 238 239
			continue
		}

		// 1.2 Get Fields
		var pos int = 0 // Record position of blob
240 241 242 243 244 245 246
		var fieldIDs []int64
		var fieldTypes []schemapb.DataType
		for _, field := range collSchema.Fields {
			fieldIDs = append(fieldIDs, field.FieldID)
			fieldTypes = append(fieldTypes, field.DataType)
		}

X
XuanYang-cn 已提交
247 248
		for _, field := range collSchema.Fields {
			switch field.DataType {
G
godchen 已提交
249
			case schemapb.DataType_FloatVector:
X
XuanYang-cn 已提交
250 251 252 253 254
				var dim int
				for _, t := range field.TypeParams {
					if t.Key == "dim" {
						dim, err = strconv.Atoi(t.Value)
						if err != nil {
N
neza2017 已提交
255
							log.Error("strconv wrong on get dim", zap.Error(err))
X
XuanYang-cn 已提交
256 257 258 259 260
						}
						break
					}
				}
				if dim <= 0 {
X
XuanYang-cn 已提交
261 262
					log.Error("invalid dim")
					continue
X
XuanYang-cn 已提交
263 264 265 266 267
					// TODO: add error handling
				}

				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.FloatVectorFieldData{
G
godchen 已提交
268
						NumRows: make([]int64, 0, 1),
X
XuanYang-cn 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282
						Data:    make([]float32, 0),
						Dim:     dim,
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData)

				var offset int
				for _, blob := range msg.RowData {
					offset = 0
					for j := 0; j < dim; j++ {
						var v float32
						buf := bytes.NewBuffer(blob.GetValue()[pos+offset:])
						if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
283
							log.Error("binary.read float32 wrong", zap.Error(err))
X
XuanYang-cn 已提交
284 285 286 287 288 289
						}
						fieldData.Data = append(fieldData.Data, v)
						offset += int(unsafe.Sizeof(*(&v)))
					}
				}
				pos += offset
G
godchen 已提交
290
				fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
X
XuanYang-cn 已提交
291

G
godchen 已提交
292
			case schemapb.DataType_BinaryVector:
X
XuanYang-cn 已提交
293 294 295 296 297
				var dim int
				for _, t := range field.TypeParams {
					if t.Key == "dim" {
						dim, err = strconv.Atoi(t.Value)
						if err != nil {
X
XuanYang-cn 已提交
298
							log.Error("strconv wrong")
X
XuanYang-cn 已提交
299 300 301 302 303
						}
						break
					}
				}
				if dim <= 0 {
X
XuanYang-cn 已提交
304
					log.Error("invalid dim")
X
XuanYang-cn 已提交
305 306 307 308 309
					// TODO: add error handling
				}

				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{
G
godchen 已提交
310
						NumRows: make([]int64, 0, 1),
X
XuanYang-cn 已提交
311 312 313 314 315 316 317 318
						Data:    make([]byte, 0),
						Dim:     dim,
					}
				}
				fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)

				var offset int
				for _, blob := range msg.RowData {
X
xige-16 已提交
319
					bv := blob.GetValue()[pos : pos+(dim/8)]
X
XuanYang-cn 已提交
320 321 322 323
					fieldData.Data = append(fieldData.Data, bv...)
					offset = len(bv)
				}
				pos += offset
G
godchen 已提交
324
				fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
X
XuanYang-cn 已提交
325

G
godchen 已提交
326
			case schemapb.DataType_Bool:
X
XuanYang-cn 已提交
327 328
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.BoolFieldData{
G
godchen 已提交
329
						NumRows: make([]int64, 0, 1),
X
XuanYang-cn 已提交
330 331 332 333 334 335 336 337 338
						Data:    make([]bool, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
				var v bool
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
339
						log.Error("binary.Read bool wrong", zap.Error(err))
X
XuanYang-cn 已提交
340 341 342 343 344
					}
					fieldData.Data = append(fieldData.Data, v)

				}
				pos += int(unsafe.Sizeof(*(&v)))
G
godchen 已提交
345
				fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
X
XuanYang-cn 已提交
346

G
godchen 已提交
347
			case schemapb.DataType_Int8:
X
XuanYang-cn 已提交
348 349
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int8FieldData{
G
godchen 已提交
350
						NumRows: make([]int64, 0, 1),
X
XuanYang-cn 已提交
351 352 353 354 355 356 357 358 359
						Data:    make([]int8, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
				var v int8
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
360
						log.Error("binary.Read int8 wrong", zap.Error(err))
X
XuanYang-cn 已提交
361 362 363 364
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
G
godchen 已提交
365
				fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
X
XuanYang-cn 已提交
366

G
godchen 已提交
367
			case schemapb.DataType_Int16:
X
XuanYang-cn 已提交
368 369
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int16FieldData{
G
godchen 已提交
370
						NumRows: make([]int64, 0, 1),
X
XuanYang-cn 已提交
371 372 373 374 375 376 377 378 379
						Data:    make([]int16, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
				var v int16
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
380
						log.Error("binary.Read int16 wrong", zap.Error(err))
X
XuanYang-cn 已提交
381 382 383 384
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
G
godchen 已提交
385
				fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
X
XuanYang-cn 已提交
386

G
godchen 已提交
387
			case schemapb.DataType_Int32:
X
XuanYang-cn 已提交
388 389
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int32FieldData{
G
godchen 已提交
390
						NumRows: make([]int64, 0, 1),
X
XuanYang-cn 已提交
391 392 393 394 395 396 397 398 399
						Data:    make([]int32, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
				var v int32
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
G
godchen 已提交
400
						log.Error("binary.Read int64 wrong", zap.Error(err))
X
XuanYang-cn 已提交
401 402 403 404
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
G
godchen 已提交
405
				fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
X
XuanYang-cn 已提交
406

G
godchen 已提交
407
			case schemapb.DataType_Int64:
X
XuanYang-cn 已提交
408 409
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int64FieldData{
G
godchen 已提交
410
						NumRows: make([]int64, 0, 1),
X
XuanYang-cn 已提交
411 412 413 414 415 416 417 418
						Data:    make([]int64, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData)
				switch field.FieldID {
				case 0: // rowIDs
					fieldData.Data = append(fieldData.Data, msg.RowIDs...)
G
godchen 已提交
419
					fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
X
XuanYang-cn 已提交
420 421 422 423
				case 1: // Timestamps
					for _, ts := range msg.Timestamps {
						fieldData.Data = append(fieldData.Data, int64(ts))
					}
G
godchen 已提交
424
					fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
X
XuanYang-cn 已提交
425 426 427 428 429
				default:
					var v int64
					for _, blob := range msg.RowData {
						buf := bytes.NewBuffer(blob.GetValue()[pos:])
						if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
430
							log.Error("binary.Read int64 wrong", zap.Error(err))
X
XuanYang-cn 已提交
431 432 433 434
						}
						fieldData.Data = append(fieldData.Data, v)
					}
					pos += int(unsafe.Sizeof(*(&v)))
G
godchen 已提交
435
					fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
X
XuanYang-cn 已提交
436 437
				}

G
godchen 已提交
438
			case schemapb.DataType_Float:
X
XuanYang-cn 已提交
439 440
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.FloatFieldData{
G
godchen 已提交
441
						NumRows: make([]int64, 0, 1),
X
XuanYang-cn 已提交
442 443 444 445 446 447 448 449 450
						Data:    make([]float32, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
				var v float32
				for _, blob := range msg.RowData {
					buf := bytes.NewBuffer(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
451
						log.Error("binary.Read float32 wrong", zap.Error(err))
X
XuanYang-cn 已提交
452 453 454 455
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
G
godchen 已提交
456
				fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
X
XuanYang-cn 已提交
457

G
godchen 已提交
458
			case schemapb.DataType_Double:
X
XuanYang-cn 已提交
459 460
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.DoubleFieldData{
G
godchen 已提交
461
						NumRows: make([]int64, 0, 1),
X
XuanYang-cn 已提交
462 463 464 465 466 467 468 469 470
						Data:    make([]float64, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
				var v float64
				for _, blob := range msg.RowData {
					buf := bytes.NewBuffer(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
471
						log.Error("binary.Read float64 wrong", zap.Error(err))
X
XuanYang-cn 已提交
472 473 474 475 476
					}
					fieldData.Data = append(fieldData.Data, v)
				}

				pos += int(unsafe.Sizeof(*(&v)))
G
godchen 已提交
477
				fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
X
XuanYang-cn 已提交
478 479 480 481 482 483
			}
		}

		// 1.3 store in buffer
		ibNode.insertBuffer.insertData[currentSegID] = idata

484
		// store current endPositions as Segment->EndPostion
X
XuanYang-cn 已提交
485
		ibNode.replica.updateSegmentEndPosition(currentSegID, iMsg.endPositions[0])
486 487
		// update segment pk filter
		ibNode.replica.updateSegmentPKRange(currentSegID, msg.GetRowIDs())
X
XuanYang-cn 已提交
488 489 490
	}

	if len(iMsg.insertMessages) > 0 {
X
XuanYang-cn 已提交
491
		log.Debug("---insert buffer status---")
X
XuanYang-cn 已提交
492 493 494
		var stopSign int = 0
		for k := range ibNode.insertBuffer.insertData {
			if stopSign >= 10 {
X
XuanYang-cn 已提交
495
				log.Debug("......")
X
XuanYang-cn 已提交
496 497
				break
			}
G
godchen 已提交
498
			log.Debug("seg buffer status", zap.Int64("segmentID", k), zap.Int64("buffer size", ibNode.insertBuffer.size(k)))
X
XuanYang-cn 已提交
499 500 501 502
			stopSign++
		}
	}

N
neza2017 已提交
503
	finishCh := make(chan segmentFlushUnit, len(segToUpdate))
N
neza2017 已提交
504
	finishCnt := sync.WaitGroup{}
S
sunby 已提交
505 506 507 508
	for _, segToFlush := range segToUpdate {
		// If full, auto flush
		if ibNode.insertBuffer.full(segToFlush) {
			log.Debug(". Insert Buffer full, auto flushing ",
G
godchen 已提交
509
				zap.Int64("num of rows", ibNode.insertBuffer.size(segToFlush)))
X
XuanYang-cn 已提交
510

S
sunby 已提交
511
			collMeta, err := ibNode.getCollMetabySegID(segToFlush, iMsg.timeRange.timestampMax)
S
sunby 已提交
512 513 514 515
			if err != nil {
				log.Error("Auto flush failed .. cannot get collection meta ..", zap.Error(err))
				continue
			}
516

S
sunby 已提交
517 518 519 520 521 522
			ibNode.flushMap.Store(segToFlush, ibNode.insertBuffer.insertData[segToFlush])
			delete(ibNode.insertBuffer.insertData, segToFlush)

			collID, partitionID, err := ibNode.getCollectionandPartitionIDbySegID(segToFlush)
			if err != nil {
				log.Error("Auto flush failed .. cannot get collection ID or partition ID..", zap.Error(err))
523
				continue
X
XuanYang-cn 已提交
524
			}
N
neza2017 已提交
525
			finishCnt.Add(1)
526

S
sunby 已提交
527
			go flushSegment(collMeta, segToFlush, partitionID, collID,
N
neza2017 已提交
528
				&ibNode.flushMap, ibNode.minIOKV, finishCh, &finishCnt, ibNode, ibNode.idAllocator)
N
neza2017 已提交
529 530 531 532 533 534 535 536 537
		}
	}
	finishCnt.Wait()
	close(finishCh)
	for fu := range finishCh {
		if fu.field2Path == nil {
			log.Debug("segment is empty")
			continue
		}
X
XuanYang-cn 已提交
538
		fu.checkPoint = ibNode.replica.listSegmentsCheckPoints()
N
neza2017 已提交
539
		fu.flushed = false
540
		if err := ibNode.dsSaveBinlog(&fu); err != nil {
N
neza2017 已提交
541
			log.Debug("data service save bin log path failed", zap.Error(err))
S
sunby 已提交
542 543 544
		}
	}

545
	// iMsg is Flush() msg from datacoord
546 547 548
	select {
	case fmsg := <-ibNode.flushChan:
		currentSegID := fmsg.segmentID
S
sunby 已提交
549 550 551 552
		log.Debug(". Receiving flush message",
			zap.Int64("segmentID", currentSegID),
			zap.Int64("collectionID", fmsg.collectionID),
		)
S
sunby 已提交
553 554 555

		if ibNode.insertBuffer.size(currentSegID) <= 0 {
			log.Debug(".. Buffer empty ...")
N
neza2017 已提交
556 557 558 559
			ibNode.dsSaveBinlog(&segmentFlushUnit{
				collID:     fmsg.collectionID,
				segID:      currentSegID,
				field2Path: map[UniqueID]string{},
X
XuanYang-cn 已提交
560
				checkPoint: ibNode.replica.listSegmentsCheckPoints(),
N
neza2017 已提交
561
				flushed:    true,
N
neza2017 已提交
562
			})
X
XuanYang-cn 已提交
563
			ibNode.replica.segmentFlushed(currentSegID)
S
sunby 已提交
564
			fmsg.dmlFlushedCh <- []*datapb.FieldBinlog{{FieldID: currentSegID, Binlogs: []string{}}}
X
XuanYang-cn 已提交
565
		} else { //insertBuffer(not empty) -> binLogs -> minIO/S3
566
			log.Debug(".. Buffer not empty, flushing ..")
N
neza2017 已提交
567
			finishCh := make(chan segmentFlushUnit, 1)
N
neza2017 已提交
568

569 570
			ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID])
			delete(ibNode.insertBuffer.insertData, currentSegID)
X
XuanYang-cn 已提交
571
			clearFn := func() {
N
neza2017 已提交
572
				finishCh <- segmentFlushUnit{field2Path: nil}
X
XuanYang-cn 已提交
573 574
				log.Debug(".. Clearing flush Buffer ..")
				ibNode.flushMap.Delete(currentSegID)
N
neza2017 已提交
575
				close(finishCh)
S
sunby 已提交
576
				fmsg.dmlFlushedCh <- []*datapb.FieldBinlog{{FieldID: currentSegID, Binlogs: nil}}
X
XuanYang-cn 已提交
577
			}
578

X
XuanYang-cn 已提交
579
			collID, partitionID, err := ibNode.getCollectionandPartitionIDbySegID(currentSegID)
580 581
			if err != nil {
				log.Error("Flush failed .. cannot get segment ..", zap.Error(err))
X
XuanYang-cn 已提交
582
				clearFn()
N
neza2017 已提交
583
				break
S
sunby 已提交
584
				// TODO add error handling
585 586
			}

X
XuanYang-cn 已提交
587
			collMeta, err := ibNode.getCollMetabySegID(currentSegID, iMsg.timeRange.timestampMax)
588
			if err != nil {
589
				log.Error("Flush failed .. cannot get collection schema ..", zap.Error(err))
X
XuanYang-cn 已提交
590
				clearFn()
N
neza2017 已提交
591
				break
S
sunby 已提交
592
				// TODO add error handling
593
			}
594

X
XuanYang-cn 已提交
595
			flushSegment(collMeta, currentSegID, partitionID, collID,
N
neza2017 已提交
596
				&ibNode.flushMap, ibNode.minIOKV, finishCh, nil, ibNode, ibNode.idAllocator)
N
neza2017 已提交
597 598 599
			fu := <-finishCh
			close(finishCh)
			if fu.field2Path != nil {
X
XuanYang-cn 已提交
600
				fu.checkPoint = ibNode.replica.listSegmentsCheckPoints()
N
neza2017 已提交
601
				fu.flushed = true
S
sunby 已提交
602 603
				if err := ibNode.dsSaveBinlog(&fu); err != nil {
					log.Debug("Data service save binlog path failed", zap.Error(err))
N
neza2017 已提交
604
				} else {
X
XuanYang-cn 已提交
605
					ibNode.replica.segmentFlushed(fu.segID)
N
neza2017 已提交
606 607
				}
			}
S
sunby 已提交
608
			fmsg.dmlFlushedCh <- []*datapb.FieldBinlog{{FieldID: currentSegID, Binlogs: []string{}}}
609 610
		}

611
	default:
X
XuanYang-cn 已提交
612 613
	}

614
	// TODO write timetick
N
neza2017 已提交
615 616 617
	if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil {
		log.Error("send hard time tick into pulsar channel failed", zap.Error(err))
	}
X
XuanYang-cn 已提交
618

619 620 621
	for _, sp := range spans {
		sp.Finish()
	}
X
XuanYang-cn 已提交
622

623
	return nil
X
XuanYang-cn 已提交
624 625
}

X
XuanYang-cn 已提交
626 627 628 629 630 631 632 633 634 635
func flushSegment(
	collMeta *etcdpb.CollectionMeta,
	segID, partitionID, collID UniqueID,
	insertData *sync.Map,
	kv kv.BaseKV,
	flushUnit chan<- segmentFlushUnit,
	wgFinish *sync.WaitGroup,
	ibNode *insertBufferNode,
	idAllocator allocatorInterface) {

N
neza2017 已提交
636 637 638
	if wgFinish != nil {
		defer wgFinish.Done()
	}
X
XuanYang-cn 已提交
639

X
XuanYang-cn 已提交
640
	clearFn := func(isSuccess bool) {
641
		if !isSuccess {
N
neza2017 已提交
642
			flushUnit <- segmentFlushUnit{field2Path: nil}
643 644
		}

645 646
		log.Debug(".. Clearing flush Buffer ..")
		insertData.Delete(segID)
X
XuanYang-cn 已提交
647
	}
X
XuanYang-cn 已提交
648 649 650 651

	inCodec := storage.NewInsertCodec(collMeta)

	// buffer data to binlogs
652 653 654
	data, ok := insertData.Load(segID)
	if !ok {
		log.Error("Flush failed ... cannot load insertData ..")
X
XuanYang-cn 已提交
655
		clearFn(false)
656 657
		return
	}
X
XuanYang-cn 已提交
658

659
	binLogs, statsBinlogs, err := inCodec.Serialize(partitionID, segID, data.(*InsertData))
X
XuanYang-cn 已提交
660
	if err != nil {
661
		log.Error("Flush failed ... cannot generate binlog ..", zap.Error(err))
X
XuanYang-cn 已提交
662
		clearFn(false)
663
		return
X
XuanYang-cn 已提交
664 665
	}

666 667 668 669
	log.Debug(".. Saving binlogs to MinIO ..", zap.Int("number", len(binLogs)))
	field2Path := make(map[UniqueID]string, len(binLogs))
	kvs := make(map[string]string, len(binLogs))
	paths := make([]string, 0, len(binLogs))
670 671 672
	field2Logidx := make(map[UniqueID]UniqueID, len(binLogs))

	// write insert binlog
673 674
	for _, blob := range binLogs {
		fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
N
neza2017 已提交
675
		log.Debug("save binlog", zap.Int64("fieldID", fieldID))
X
XuanYang-cn 已提交
676
		if err != nil {
677
			log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
X
XuanYang-cn 已提交
678
			clearFn(false)
679
			return
X
XuanYang-cn 已提交
680 681
		}

682
		logidx, err := idAllocator.allocID()
X
XuanYang-cn 已提交
683
		if err != nil {
684
			log.Error("Flush failed ... cannot alloc ID ..", zap.Error(err))
X
XuanYang-cn 已提交
685
			clearFn(false)
686
			return
X
XuanYang-cn 已提交
687 688
		}

689 690 691
		// no error raise if alloc=false
		k, _ := idAllocator.genKey(false, collID, partitionID, segID, fieldID, logidx)

692 693 694 695
		key := path.Join(Params.InsertBinlogRootPath, k)
		paths = append(paths, key)
		kvs[key] = string(blob.Value[:])
		field2Path[fieldID] = key
696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
		field2Logidx[fieldID] = logidx
	}

	// write stats binlog
	for _, blob := range statsBinlogs {
		fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
		if err != nil {
			log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
			clearFn(false)
			return
		}

		logidx := field2Logidx[fieldID]

		// no error raise if alloc=false
		k, _ := idAllocator.genKey(false, collID, partitionID, segID, fieldID, logidx)

		key := path.Join(Params.StatsBinlogRootPath, k)
		kvs[key] = string(blob.Value[:])
715
	}
N
neza2017 已提交
716
	log.Debug("save binlog file to MinIO/S3")
717 718 719 720 721

	err = kv.MultiSave(kvs)
	if err != nil {
		log.Error("Flush failed ... cannot save to MinIO ..", zap.Error(err))
		_ = kv.MultiRemove(paths)
X
XuanYang-cn 已提交
722
		clearFn(false)
723 724
		return
	}
X
XuanYang-cn 已提交
725

X
XuanYang-cn 已提交
726 727
	ibNode.replica.updateSegmentCheckPoint(segID)
	startPos := ibNode.replica.listNewSegmentsStartPositions()
728
	flushUnit <- segmentFlushUnit{collID: collID, segID: segID, field2Path: field2Path, startPositions: startPos}
729 730 731
	clearFn(true)
}

X
XuanYang-cn 已提交
732 733
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
	msgPack := msgstream.MsgPack{}
S
sunby 已提交
734
	timeTickMsg := msgstream.DataNodeTtMsg{
X
XuanYang-cn 已提交
735 736 737 738 739
		BaseMsg: msgstream.BaseMsg{
			BeginTimestamp: ts,
			EndTimestamp:   ts,
			HashValues:     []uint32{0},
		},
S
sunby 已提交
740
		DataNodeTtMsg: datapb.DataNodeTtMsg{
X
XuanYang-cn 已提交
741
			Base: &commonpb.MsgBase{
S
sunby 已提交
742 743 744
				MsgType:   commonpb.MsgType_DataNodeTt,
				MsgID:     0,
				Timestamp: ts,
X
XuanYang-cn 已提交
745
			},
S
sunby 已提交
746 747
			ChannelName: ibNode.channelName,
			Timestamp:   ts,
X
XuanYang-cn 已提交
748 749 750
		},
	}
	msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
751
	return ibNode.timeTickStream.Produce(&msgPack)
X
XuanYang-cn 已提交
752 753
}

X
XuanYang-cn 已提交
754
func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error {
X
XuanYang-cn 已提交
755
	log.Debug("Updating segments statistics...")
G
godchen 已提交
756
	statsUpdates := make([]*internalpb.SegmentStatisticsUpdates, 0, len(segIDs))
757 758 759
	for _, segID := range segIDs {
		updates, err := ibNode.replica.getSegmentStatisticsUpdates(segID)
		if err != nil {
X
XuanYang-cn 已提交
760
			log.Error("get segment statistics updates wrong", zap.Int64("segmentID", segID), zap.Error(err))
761 762
			continue
		}
763 764 765 766 767 768

		log.Debug("Segment Statistics to Update",
			zap.Int64("Segment ID", updates.GetSegmentID()),
			zap.Int64("NumOfRows", updates.GetNumRows()),
		)

769 770 771
		statsUpdates = append(statsUpdates, updates)
	}

G
godchen 已提交
772
	segStats := internalpb.SegmentStatistics{
773
		Base: &commonpb.MsgBase{
774
			MsgType:   commonpb.MsgType_SegmentStatistics,
775 776
			MsgID:     UniqueID(0),  // GOOSE TODO
			Timestamp: Timestamp(0), // GOOSE TODO
X
XuanYang-cn 已提交
777
			SourceID:  Params.NodeID,
778 779 780 781 782 783
		},
		SegStats: statsUpdates,
	}

	var msg msgstream.TsMsg = &msgstream.SegmentStatisticsMsg{
		BaseMsg: msgstream.BaseMsg{
X
XuanYang-cn 已提交
784
			HashValues: []uint32{0}, // GOOSE TODO
785 786 787 788 789 790 791
		},
		SegmentStatistics: segStats,
	}

	var msgPack = msgstream.MsgPack{
		Msgs: []msgstream.TsMsg{msg},
	}
792
	return ibNode.segmentStatisticsStream.Produce(&msgPack)
793 794
}

S
sunby 已提交
795
func (ibNode *insertBufferNode) getCollMetabySegID(segmentID UniqueID, ts Timestamp) (meta *etcdpb.CollectionMeta, err error) {
796
	if !ibNode.replica.hasSegment(segmentID, true) {
X
XuanYang-cn 已提交
797
		return nil, fmt.Errorf("No such segment %d in the replica", segmentID)
798 799
	}

X
XuanYang-cn 已提交
800 801
	collID := ibNode.replica.getCollectionID()
	sch, err := ibNode.replica.getCollectionSchema(collID, ts)
802 803 804
	if err != nil {
		return
	}
S
sunby 已提交
805 806

	meta = &etcdpb.CollectionMeta{
X
XuanYang-cn 已提交
807 808
		ID:     collID,
		Schema: sch,
S
sunby 已提交
809
	}
810 811 812 813
	return
}

func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID UniqueID) (collID, partitionID UniqueID, err error) {
X
XuanYang-cn 已提交
814
	return ibNode.replica.getCollectionAndPartitionID(segmentID)
815 816
}

N
neza2017 已提交
817 818 819 820 821 822
func newInsertBufferNode(
	ctx context.Context,
	replica Replica,
	factory msgstream.Factory,
	idAllocator allocatorInterface,
	flushCh <-chan *flushMsg,
N
neza2017 已提交
823
	saveBinlog func(*segmentFlushUnit) error,
S
sunby 已提交
824
	channelName string,
N
neza2017 已提交
825 826
) *insertBufferNode {

X
XuanYang-cn 已提交
827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854
	maxQueueLength := Params.FlowGraphMaxQueueLength
	maxParallelism := Params.FlowGraphMaxParallelism

	baseNode := BaseNode{}
	baseNode.SetMaxQueueLength(maxQueueLength)
	baseNode.SetMaxParallelism(maxParallelism)

	maxSize := Params.FlushInsertBufferSize
	iBuffer := &insertBuffer{
		insertData: make(map[UniqueID]*InsertData),
		maxSize:    maxSize,
	}

	// MinIO
	option := &miniokv.Option{
		Address:           Params.MinioAddress,
		AccessKeyID:       Params.MinioAccessKeyID,
		SecretAccessKeyID: Params.MinioSecretAccessKey,
		UseSSL:            Params.MinioUseSSL,
		CreateBucket:      true,
		BucketName:        Params.MinioBucketName,
	}

	minIOKV, err := miniokv.NewMinIOKV(ctx, option)
	if err != nil {
		panic(err)
	}

855
	//input stream, data node time tick
Z
zhenshan.cao 已提交
856 857
	wTt, _ := factory.NewMsgStream(ctx)
	wTt.AsProducer([]string{Params.TimeTickChannelName})
X
Xiangyu Wang 已提交
858
	log.Debug("datanode AsProducer: " + Params.TimeTickChannelName)
859
	var wTtMsgStream msgstream.MsgStream = wTt
X
XuanYang-cn 已提交
860
	wTtMsgStream.Start()
861 862

	// update statistics channel
Z
zhenshan.cao 已提交
863 864
	segS, _ := factory.NewMsgStream(ctx)
	segS.AsProducer([]string{Params.SegmentStatisticsChannelName})
X
Xiangyu Wang 已提交
865
	log.Debug("datanode AsProducer: " + Params.SegmentStatisticsChannelName)
866
	var segStatisticsMsgStream msgstream.MsgStream = segS
X
XuanYang-cn 已提交
867
	segStatisticsMsgStream.Start()
X
XuanYang-cn 已提交
868

X
XuanYang-cn 已提交
869
	return &insertBufferNode{
870 871 872
		BaseNode:     baseNode,
		insertBuffer: iBuffer,
		minIOKV:      minIOKV,
S
sunby 已提交
873
		channelName:  channelName,
874

X
XuanYang-cn 已提交
875 876
		timeTickStream:          wTtMsgStream,
		segmentStatisticsStream: segStatisticsMsgStream,
877

N
neza2017 已提交
878 879 880 881 882 883
		replica:            replica,
		flushMap:           sync.Map{},
		flushChan:          flushCh,
		idAllocator:        idAllocator,
		dsSaveBinlog:       saveBinlog,
		segmentCheckPoints: make(map[UniqueID]segmentCheckPoint),
X
XuanYang-cn 已提交
884 885
	}
}