flow_graph_insert_buffer_node.go 22.2 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4 5 6 7 8
package datanode

import (
	"bytes"
	"context"
	"encoding/binary"
	"path"
	"strconv"
9
	"sync"
X
XuanYang-cn 已提交
10 11
	"unsafe"

X
XuanYang-cn 已提交
12
	"go.uber.org/zap"
X
XuanYang-cn 已提交
13

14
	"github.com/opentracing/opentracing-go"
X
XuanYang-cn 已提交
15 16
	"github.com/zilliztech/milvus-distributed/internal/kv"
	miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
X
XuanYang-cn 已提交
17
	"github.com/zilliztech/milvus-distributed/internal/log"
X
XuanYang-cn 已提交
18
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
X
XuanYang-cn 已提交
19
	"github.com/zilliztech/milvus-distributed/internal/storage"
20 21
	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
	"github.com/zilliztech/milvus-distributed/internal/util/trace"
X
XuanYang-cn 已提交
22 23

	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
X
XuanYang-cn 已提交
24
	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
G
godchen 已提交
25
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
X
XuanYang-cn 已提交
26 27 28 29 30 31 32 33 34 35 36 37
	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)

const (
	CollectionPrefix = "/collection/"
	SegmentPrefix    = "/segment/"
)

type (
	InsertData = storage.InsertData
	Blob       = storage.Blob
)
38 39 40 41 42 43 44
type insertBufferNode struct {
	BaseNode
	insertBuffer *insertBuffer
	replica      Replica
	flushMeta    *binlogMeta
	flushMap     sync.Map

G
godchen 已提交
45
	minIOKV kv.BaseKV
46 47 48 49 50 51 52 53 54 55

	timeTickStream          msgstream.MsgStream
	segmentStatisticsStream msgstream.MsgStream
	completeFlushStream     msgstream.MsgStream
}

type insertBuffer struct {
	insertData map[UniqueID]*InsertData // SegmentID to InsertData
	maxSize    int32
}
X
XuanYang-cn 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89

func (ib *insertBuffer) size(segmentID UniqueID) int32 {
	if ib.insertData == nil || len(ib.insertData) <= 0 {
		return 0
	}
	idata, ok := ib.insertData[segmentID]
	if !ok {
		return 0
	}

	var maxSize int32 = 0
	for _, data := range idata.Data {
		fdata, ok := data.(*storage.FloatVectorFieldData)
		if ok && int32(fdata.NumRows) > maxSize {
			maxSize = int32(fdata.NumRows)
		}

		bdata, ok := data.(*storage.BinaryVectorFieldData)
		if ok && int32(bdata.NumRows) > maxSize {
			maxSize = int32(bdata.NumRows)
		}

	}
	return maxSize
}

func (ib *insertBuffer) full(segmentID UniqueID) bool {
	return ib.size(segmentID) >= ib.maxSize
}

func (ibNode *insertBufferNode) Name() string {
	return "ibNode"
}

90
func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
X
XuanYang-cn 已提交
91 92

	if len(in) != 1 {
X
XuanYang-cn 已提交
93
		log.Error("Invalid operate message input in insertBufferNode", zap.Int("input length", len(in)))
X
XuanYang-cn 已提交
94 95 96
		// TODO: add error handling
	}

97
	iMsg, ok := in[0].(*insertMsg)
X
XuanYang-cn 已提交
98
	if !ok {
X
XuanYang-cn 已提交
99
		log.Error("type assertion failed for insertMsg")
X
XuanYang-cn 已提交
100 101 102
		// TODO: add error handling
	}

B
bigsheeper 已提交
103
	if iMsg == nil {
104 105 106 107 108 109 110 111
		return []Msg{}
	}

	var spans []opentracing.Span
	for _, msg := range iMsg.insertMessages {
		sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
		spans = append(spans, sp)
		msg.SetTraceCtx(ctx)
B
bigsheeper 已提交
112 113
	}

X
XuanYang-cn 已提交
114
	// Updating segment statistics
X
XuanYang-cn 已提交
115
	uniqueSeg := make(map[UniqueID]int64)
116
	for _, msg := range iMsg.insertMessages {
117

118
		currentSegID := msg.GetSegmentID()
X
XuanYang-cn 已提交
119 120 121
		collID := msg.GetCollectionID()
		partitionID := msg.GetPartitionID()

122
		if !ibNode.replica.hasSegment(currentSegID) {
X
XuanYang-cn 已提交
123
			err := ibNode.replica.addSegment(currentSegID, collID, partitionID, msg.GetChannelID())
X
XuanYang-cn 已提交
124
			if err != nil {
X
XuanYang-cn 已提交
125
				log.Error("add segment wrong", zap.Error(err))
X
XuanYang-cn 已提交
126
			}
X
XuanYang-cn 已提交
127 128 129 130 131

			switch {
			case iMsg.startPositions == nil || len(iMsg.startPositions) <= 0:
				log.Error("insert Msg StartPosition empty")
			default:
Z
zhenshan.cao 已提交
132 133 134 135 136 137 138 139
				segment, err := ibNode.replica.getSegmentByID(currentSegID)
				if err != nil {
					log.Error("get segment wrong", zap.Error(err))
				}
				var startPosition *internalpb.MsgPosition = nil
				for _, pos := range iMsg.startPositions {
					if pos.ChannelName == segment.channelName {
						startPosition = pos
X
XuanYang-cn 已提交
140
						break
Z
zhenshan.cao 已提交
141 142 143 144 145 146 147
					}
				}
				if startPosition == nil {
					log.Error("get position wrong", zap.Error(err))
				} else {
					ibNode.replica.setStartPosition(currentSegID, startPosition)
				}
X
XuanYang-cn 已提交
148
			}
X
XuanYang-cn 已提交
149 150
		}

X
XuanYang-cn 已提交
151 152
		segNum := uniqueSeg[currentSegID]
		uniqueSeg[currentSegID] = segNum + int64(len(msg.RowIDs))
153
	}
X
XuanYang-cn 已提交
154

155
	segIDs := make([]UniqueID, 0, len(uniqueSeg))
X
XuanYang-cn 已提交
156
	for id, num := range uniqueSeg {
157
		segIDs = append(segIDs, id)
X
XuanYang-cn 已提交
158 159 160 161 162

		err := ibNode.replica.updateStatistics(id, num)
		if err != nil {
			log.Error("update Segment Row number wrong", zap.Error(err))
		}
163
	}
X
XuanYang-cn 已提交
164 165

	if len(segIDs) > 0 {
X
XuanYang-cn 已提交
166 167 168
		err := ibNode.updateSegStatistics(segIDs)
		if err != nil {
			log.Error("update segment statistics error", zap.Error(err))
X
XuanYang-cn 已提交
169
		}
170 171
	}

X
XuanYang-cn 已提交
172 173 174 175
	// iMsg is insertMsg
	// 1. iMsg -> buffer
	for _, msg := range iMsg.insertMessages {
		if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
X
XuanYang-cn 已提交
176
			log.Error("misaligned messages detected")
X
XuanYang-cn 已提交
177 178 179
			continue
		}
		currentSegID := msg.GetSegmentID()
X
XuanYang-cn 已提交
180
		collectionID := msg.GetCollectionID()
X
XuanYang-cn 已提交
181 182 183 184 185 186 187 188

		idata, ok := ibNode.insertBuffer.insertData[currentSegID]
		if !ok {
			idata = &InsertData{
				Data: make(map[UniqueID]storage.FieldData),
			}
		}

X
XuanYang-cn 已提交
189 190
		// 1.1 Get CollectionMeta
		collection, err := ibNode.replica.getCollectionByID(collectionID)
X
XuanYang-cn 已提交
191 192
		if err != nil {
			// GOOSE TODO add error handler
X
XuanYang-cn 已提交
193
			log.Error("Get meta wrong:", zap.Error(err))
X
XuanYang-cn 已提交
194 195 196 197 198 199 200 201
			continue
		}

		collSchema := collection.schema
		// 1.2 Get Fields
		var pos int = 0 // Record position of blob
		for _, field := range collSchema.Fields {
			switch field.DataType {
G
godchen 已提交
202
			case schemapb.DataType_FloatVector:
X
XuanYang-cn 已提交
203 204 205 206 207
				var dim int
				for _, t := range field.TypeParams {
					if t.Key == "dim" {
						dim, err = strconv.Atoi(t.Value)
						if err != nil {
X
XuanYang-cn 已提交
208
							log.Error("strconv wrong")
X
XuanYang-cn 已提交
209 210 211 212 213
						}
						break
					}
				}
				if dim <= 0 {
X
XuanYang-cn 已提交
214 215
					log.Error("invalid dim")
					continue
X
XuanYang-cn 已提交
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
					// TODO: add error handling
				}

				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.FloatVectorFieldData{
						NumRows: 0,
						Data:    make([]float32, 0),
						Dim:     dim,
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData)

				var offset int
				for _, blob := range msg.RowData {
					offset = 0
					for j := 0; j < dim; j++ {
						var v float32
						buf := bytes.NewBuffer(blob.GetValue()[pos+offset:])
						if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
236
							log.Error("binary.read float32 wrong", zap.Error(err))
X
XuanYang-cn 已提交
237 238 239 240 241 242 243 244
						}
						fieldData.Data = append(fieldData.Data, v)
						offset += int(unsafe.Sizeof(*(&v)))
					}
				}
				pos += offset
				fieldData.NumRows += len(msg.RowIDs)

G
godchen 已提交
245
			case schemapb.DataType_BinaryVector:
X
XuanYang-cn 已提交
246 247 248 249 250
				var dim int
				for _, t := range field.TypeParams {
					if t.Key == "dim" {
						dim, err = strconv.Atoi(t.Value)
						if err != nil {
X
XuanYang-cn 已提交
251
							log.Error("strconv wrong")
X
XuanYang-cn 已提交
252 253 254 255 256
						}
						break
					}
				}
				if dim <= 0 {
X
XuanYang-cn 已提交
257
					log.Error("invalid dim")
X
XuanYang-cn 已提交
258 259 260 261 262 263 264 265 266 267 268 269 270 271
					// TODO: add error handling
				}

				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{
						NumRows: 0,
						Data:    make([]byte, 0),
						Dim:     dim,
					}
				}
				fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)

				var offset int
				for _, blob := range msg.RowData {
X
xige-16 已提交
272
					bv := blob.GetValue()[pos : pos+(dim/8)]
X
XuanYang-cn 已提交
273 274 275 276 277 278
					fieldData.Data = append(fieldData.Data, bv...)
					offset = len(bv)
				}
				pos += offset
				fieldData.NumRows += len(msg.RowData)

G
godchen 已提交
279
			case schemapb.DataType_Bool:
X
XuanYang-cn 已提交
280 281 282 283 284 285 286 287 288 289 290 291
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.BoolFieldData{
						NumRows: 0,
						Data:    make([]bool, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
				var v bool
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
292
						log.Error("binary.Read bool wrong", zap.Error(err))
X
XuanYang-cn 已提交
293 294 295 296 297 298 299
					}
					fieldData.Data = append(fieldData.Data, v)

				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

G
godchen 已提交
300
			case schemapb.DataType_Int8:
X
XuanYang-cn 已提交
301 302 303 304 305 306 307 308 309 310 311 312
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int8FieldData{
						NumRows: 0,
						Data:    make([]int8, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
				var v int8
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
313
						log.Error("binary.Read int8 wrong", zap.Error(err))
X
XuanYang-cn 已提交
314 315 316 317 318 319
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

G
godchen 已提交
320
			case schemapb.DataType_Int16:
X
XuanYang-cn 已提交
321 322 323 324 325 326 327 328 329 330 331 332
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int16FieldData{
						NumRows: 0,
						Data:    make([]int16, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
				var v int16
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
333
						log.Error("binary.Read int16 wrong", zap.Error(err))
X
XuanYang-cn 已提交
334 335 336 337 338 339
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

G
godchen 已提交
340
			case schemapb.DataType_Int32:
X
XuanYang-cn 已提交
341 342 343 344 345 346 347 348 349 350 351 352
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int32FieldData{
						NumRows: 0,
						Data:    make([]int32, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
				var v int32
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
353
						log.Error("binary.Read int32 wrong", zap.Error(err))
X
XuanYang-cn 已提交
354 355 356 357 358 359
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

G
godchen 已提交
360
			case schemapb.DataType_Int64:
X
XuanYang-cn 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int64FieldData{
						NumRows: 0,
						Data:    make([]int64, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData)
				switch field.FieldID {
				case 0: // rowIDs
					fieldData.Data = append(fieldData.Data, msg.RowIDs...)
					fieldData.NumRows += len(msg.RowIDs)
				case 1: // Timestamps
					for _, ts := range msg.Timestamps {
						fieldData.Data = append(fieldData.Data, int64(ts))
					}
					fieldData.NumRows += len(msg.Timestamps)
				default:
					var v int64
					for _, blob := range msg.RowData {
						buf := bytes.NewBuffer(blob.GetValue()[pos:])
						if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
383
							log.Error("binary.Read int64 wrong", zap.Error(err))
X
XuanYang-cn 已提交
384 385 386 387 388 389 390
						}
						fieldData.Data = append(fieldData.Data, v)
					}
					pos += int(unsafe.Sizeof(*(&v)))
					fieldData.NumRows += len(msg.RowIDs)
				}

G
godchen 已提交
391
			case schemapb.DataType_Float:
X
XuanYang-cn 已提交
392 393 394 395 396 397 398 399 400 401 402 403
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.FloatFieldData{
						NumRows: 0,
						Data:    make([]float32, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
				var v float32
				for _, blob := range msg.RowData {
					buf := bytes.NewBuffer(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
404
						log.Error("binary.Read float32 wrong", zap.Error(err))
X
XuanYang-cn 已提交
405 406 407 408 409 410
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

G
godchen 已提交
411
			case schemapb.DataType_Double:
X
XuanYang-cn 已提交
412 413 414 415 416 417 418 419 420 421 422 423
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.DoubleFieldData{
						NumRows: 0,
						Data:    make([]float64, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
				var v float64
				for _, blob := range msg.RowData {
					buf := bytes.NewBuffer(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
424
						log.Error("binary.Read float64 wrong", zap.Error(err))
X
XuanYang-cn 已提交
425 426 427 428 429 430 431 432 433 434 435 436
					}
					fieldData.Data = append(fieldData.Data, v)
				}

				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)
			}
		}

		// 1.3 store in buffer
		ibNode.insertBuffer.insertData[currentSegID] = idata

X
XuanYang-cn 已提交
437 438 439 440
		switch {
		case iMsg.endPositions == nil || len(iMsg.endPositions) <= 0:
			log.Error("insert Msg EndPosition empty")
		default:
Z
zhenshan.cao 已提交
441 442 443 444 445 446 447 448 449 450 451 452 453 454
			segment, err := ibNode.replica.getSegmentByID(currentSegID)
			if err != nil {
				log.Error("get segment wrong", zap.Error(err))
			}
			var endPosition *internalpb.MsgPosition = nil
			for _, pos := range iMsg.endPositions {
				if pos.ChannelName == segment.channelName {
					endPosition = pos
				}
			}
			if endPosition == nil {
				log.Error("get position wrong", zap.Error(err))
			}
			ibNode.replica.setEndPosition(currentSegID, endPosition)
X
XuanYang-cn 已提交
455 456
		}

457
		// 1.4 if full, auto flush
X
XuanYang-cn 已提交
458
		if ibNode.insertBuffer.full(currentSegID) {
459 460
			log.Debug(". Insert Buffer full, auto flushing ",
				zap.Int32("num of rows", ibNode.insertBuffer.size(currentSegID)))
X
XuanYang-cn 已提交
461

462
			collSch, err := ibNode.getCollectionSchemaByID(collection.GetID())
X
XuanYang-cn 已提交
463
			if err != nil {
464 465
				log.Error("Auto flush failed .. cannot get collection schema ..", zap.Error(err))
				continue
X
XuanYang-cn 已提交
466
			}
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
			collMeta := &etcdpb.CollectionMeta{
				Schema: collSch,
				ID:     collection.GetID(),
			}

			ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID])
			delete(ibNode.insertBuffer.insertData, currentSegID)

			finishCh := make(chan bool)
			go flushSegmentTxn(collMeta, currentSegID, msg.GetPartitionID(), collection.GetID(),
				&ibNode.flushMap, ibNode.flushMeta, ibNode.minIOKV,
				finishCh)

			go func(finishCh <-chan bool) {
				if finished := <-finishCh; !finished {
					log.Debug(".. Auto Flush failed ..")
					return
				}
				log.Debug(".. Auto Flush completed ..")
			}(finishCh)
X
XuanYang-cn 已提交
487 488 489 490
		}
	}

	if len(iMsg.insertMessages) > 0 {
X
XuanYang-cn 已提交
491
		log.Debug("---insert buffer status---")
X
XuanYang-cn 已提交
492 493 494
		var stopSign int = 0
		for k := range ibNode.insertBuffer.insertData {
			if stopSign >= 10 {
X
XuanYang-cn 已提交
495
				log.Debug("......")
X
XuanYang-cn 已提交
496 497
				break
			}
X
XuanYang-cn 已提交
498
			log.Debug("seg buffer status", zap.Int64("segmentID", k), zap.Int32("buffer size", ibNode.insertBuffer.size(k)))
X
XuanYang-cn 已提交
499 500 501 502
			stopSign++
		}
	}

X
XuanYang-cn 已提交
503
	// iMsg is Flush() msg from dataservice
X
XuanYang-cn 已提交
504 505
	//   1. insertBuffer(not empty) -> binLogs -> minIO/S3
	for _, msg := range iMsg.flushMessages {
X
XuanYang-cn 已提交
506
		for _, currentSegID := range msg.segmentIDs {
X
XuanYang-cn 已提交
507
			log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID))
X
XuanYang-cn 已提交
508

509 510 511 512 513 514 515
			finishCh := make(chan bool)
			go ibNode.completeFlush(currentSegID, finishCh)

			if ibNode.insertBuffer.size(currentSegID) <= 0 {
				log.Debug(".. Buffer empty ...")
				finishCh <- true
				continue
X
XuanYang-cn 已提交
516
			}
517 518 519 520

			log.Debug(".. Buffer not empty, flushing ..")
			ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID])
			delete(ibNode.insertBuffer.insertData, currentSegID)
X
XuanYang-cn 已提交
521 522 523 524 525
			clearFn := func() {
				finishCh <- false
				log.Debug(".. Clearing flush Buffer ..")
				ibNode.flushMap.Delete(currentSegID)
			}
526 527 528 529

			seg, err := ibNode.replica.getSegmentByID(currentSegID)
			if err != nil {
				log.Error("Flush failed .. cannot get segment ..", zap.Error(err))
X
XuanYang-cn 已提交
530
				clearFn()
531 532 533 534
				continue
			}

			collSch, err := ibNode.getCollectionSchemaByID(seg.collectionID)
535
			if err != nil {
536
				log.Error("Flush failed .. cannot get collection schema ..", zap.Error(err))
X
XuanYang-cn 已提交
537
				clearFn()
538
				continue
539
			}
540 541 542 543 544 545 546 547

			collMeta := &etcdpb.CollectionMeta{
				Schema: collSch,
				ID:     seg.collectionID,
			}

			go flushSegmentTxn(collMeta, currentSegID, seg.partitionID, seg.collectionID,
				&ibNode.flushMap, ibNode.flushMeta, ibNode.minIOKV, finishCh)
X
XuanYang-cn 已提交
548 549 550 551
		}
	}

	if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil {
X
XuanYang-cn 已提交
552
		log.Error("send hard time tick into pulsar channel failed", zap.Error(err))
X
XuanYang-cn 已提交
553 554 555 556 557 558
	}

	var res Msg = &gcMsg{
		gcRecord:  iMsg.gcRecord,
		timeRange: iMsg.timeRange,
	}
559 560 561
	for _, sp := range spans {
		sp.Finish()
	}
X
XuanYang-cn 已提交
562

563
	return []Msg{res}
X
XuanYang-cn 已提交
564 565
}

566
func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionID UniqueID, collID UniqueID,
G
godchen 已提交
567
	insertData *sync.Map, meta *binlogMeta, kv kv.BaseKV, finishCh chan<- bool) {
X
XuanYang-cn 已提交
568

X
XuanYang-cn 已提交
569 570
	clearFn := func(isSuccess bool) {
		finishCh <- isSuccess
571 572
		log.Debug(".. Clearing flush Buffer ..")
		insertData.Delete(segID)
X
XuanYang-cn 已提交
573
	}
X
XuanYang-cn 已提交
574 575 576 577

	inCodec := storage.NewInsertCodec(collMeta)

	// buffer data to binlogs
578 579 580
	data, ok := insertData.Load(segID)
	if !ok {
		log.Error("Flush failed ... cannot load insertData ..")
X
XuanYang-cn 已提交
581
		clearFn(false)
582 583
		return
	}
X
XuanYang-cn 已提交
584

585
	binLogs, err := inCodec.Serialize(partitionID, segID, data.(*InsertData))
X
XuanYang-cn 已提交
586
	if err != nil {
587
		log.Error("Flush failed ... cannot generate binlog ..", zap.Error(err))
X
XuanYang-cn 已提交
588
		clearFn(false)
589
		return
X
XuanYang-cn 已提交
590 591
	}

592 593 594 595 596 597
	log.Debug(".. Saving binlogs to MinIO ..", zap.Int("number", len(binLogs)))
	field2Path := make(map[UniqueID]string, len(binLogs))
	kvs := make(map[string]string, len(binLogs))
	paths := make([]string, 0, len(binLogs))
	for _, blob := range binLogs {
		fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
X
XuanYang-cn 已提交
598
		if err != nil {
599
			log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
X
XuanYang-cn 已提交
600
			clearFn(false)
601
			return
X
XuanYang-cn 已提交
602 603
		}

604
		k, err := meta.genKey(true, collID, partitionID, segID, fieldID)
X
XuanYang-cn 已提交
605
		if err != nil {
606
			log.Error("Flush failed ... cannot alloc ID ..", zap.Error(err))
X
XuanYang-cn 已提交
607
			clearFn(false)
608
			return
X
XuanYang-cn 已提交
609 610
		}

611 612 613 614 615 616 617 618 619 620
		key := path.Join(Params.InsertBinlogRootPath, k)
		paths = append(paths, key)
		kvs[key] = string(blob.Value[:])
		field2Path[fieldID] = key
	}

	err = kv.MultiSave(kvs)
	if err != nil {
		log.Error("Flush failed ... cannot save to MinIO ..", zap.Error(err))
		_ = kv.MultiRemove(paths)
X
XuanYang-cn 已提交
621
		clearFn(false)
622 623
		return
	}
X
XuanYang-cn 已提交
624

625 626 627 628 629
	log.Debug(".. Saving binlog paths to etcd ..", zap.Int("number", len(binLogs)))
	err = meta.SaveSegmentBinlogMetaTxn(segID, field2Path)
	if err != nil {
		log.Error("Flush failed ... cannot save binlog paths ..", zap.Error(err))
		_ = kv.MultiRemove(paths)
X
XuanYang-cn 已提交
630
		clearFn(false)
631
		return
X
XuanYang-cn 已提交
632
	}
633

X
XuanYang-cn 已提交
634
	clearFn(true)
X
XuanYang-cn 已提交
635 636
}

637 638 639 640 641 642
func (ibNode *insertBufferNode) completeFlush(segID UniqueID, finishCh <-chan bool) {
	if finished := <-finishCh; !finished {
		return
	}

	log.Debug(".. Segment flush completed ..")
X
XuanYang-cn 已提交
643 644 645
	ibNode.replica.setIsFlushed(segID)
	ibNode.updateSegStatistics([]UniqueID{segID})

X
XuanYang-cn 已提交
646
	msgPack := msgstream.MsgPack{}
G
godchen 已提交
647
	completeFlushMsg := internalpb.SegmentFlushCompletedMsg{
X
XuanYang-cn 已提交
648
		Base: &commonpb.MsgBase{
649
			MsgType:   commonpb.MsgType_SegmentFlushDone,
X
XuanYang-cn 已提交
650 651
			MsgID:     0, // GOOSE TODO
			Timestamp: 0, // GOOSE TODO
X
XuanYang-cn 已提交
652
			SourceID:  Params.NodeID,
X
XuanYang-cn 已提交
653 654 655 656 657 658 659 660 661 662 663
		},
		SegmentID: segID,
	}
	var msg msgstream.TsMsg = &msgstream.FlushCompletedMsg{
		BaseMsg: msgstream.BaseMsg{
			HashValues: []uint32{0},
		},
		SegmentFlushCompletedMsg: completeFlushMsg,
	}

	msgPack.Msgs = append(msgPack.Msgs, msg)
664
	err := ibNode.completeFlushStream.Produce(&msgPack)
665 666 667
	if err != nil {
		log.Error(".. Produce complete flush msg failed ..", zap.Error(err))
	}
X
XuanYang-cn 已提交
668 669
}

X
XuanYang-cn 已提交
670 671 672 673 674 675 676 677
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
	msgPack := msgstream.MsgPack{}
	timeTickMsg := msgstream.TimeTickMsg{
		BaseMsg: msgstream.BaseMsg{
			BeginTimestamp: ts,
			EndTimestamp:   ts,
			HashValues:     []uint32{0},
		},
G
godchen 已提交
678
		TimeTickMsg: internalpb.TimeTickMsg{
X
XuanYang-cn 已提交
679
			Base: &commonpb.MsgBase{
680
				MsgType:   commonpb.MsgType_TimeTick,
X
XuanYang-cn 已提交
681 682
				MsgID:     0,  // GOOSE TODO
				Timestamp: ts, // GOOSE TODO
X
XuanYang-cn 已提交
683
				SourceID:  Params.NodeID,
X
XuanYang-cn 已提交
684 685 686 687
			},
		},
	}
	msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
688
	return ibNode.timeTickStream.Produce(&msgPack)
X
XuanYang-cn 已提交
689 690
}

X
XuanYang-cn 已提交
691
func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error {
X
XuanYang-cn 已提交
692
	log.Debug("Updating segments statistics...")
G
godchen 已提交
693
	statsUpdates := make([]*internalpb.SegmentStatisticsUpdates, 0, len(segIDs))
694 695 696
	for _, segID := range segIDs {
		updates, err := ibNode.replica.getSegmentStatisticsUpdates(segID)
		if err != nil {
X
XuanYang-cn 已提交
697
			log.Error("get segment statistics updates wrong", zap.Int64("segmentID", segID), zap.Error(err))
698 699 700 701 702
			continue
		}
		statsUpdates = append(statsUpdates, updates)
	}

G
godchen 已提交
703
	segStats := internalpb.SegmentStatistics{
704
		Base: &commonpb.MsgBase{
705
			MsgType:   commonpb.MsgType_SegmentStatistics,
706 707
			MsgID:     UniqueID(0),  // GOOSE TODO
			Timestamp: Timestamp(0), // GOOSE TODO
X
XuanYang-cn 已提交
708
			SourceID:  Params.NodeID,
709 710 711 712 713 714
		},
		SegStats: statsUpdates,
	}

	var msg msgstream.TsMsg = &msgstream.SegmentStatisticsMsg{
		BaseMsg: msgstream.BaseMsg{
X
XuanYang-cn 已提交
715
			HashValues: []uint32{0}, // GOOSE TODO
716 717 718 719 720 721 722
		},
		SegmentStatistics: segStats,
	}

	var msgPack = msgstream.MsgPack{
		Msgs: []msgstream.TsMsg{msg},
	}
723
	return ibNode.segmentStatisticsStream.Produce(&msgPack)
724 725 726 727 728 729 730 731 732 733
}

func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (*schemapb.CollectionSchema, error) {
	ret, err := ibNode.replica.getCollectionByID(collectionID)
	if err != nil {
		return nil, err
	}
	return ret.schema, nil
}

734 735
func newInsertBufferNode(ctx context.Context, flushMeta *binlogMeta,
	replica Replica, factory msgstream.Factory) *insertBufferNode {
X
XuanYang-cn 已提交
736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763
	maxQueueLength := Params.FlowGraphMaxQueueLength
	maxParallelism := Params.FlowGraphMaxParallelism

	baseNode := BaseNode{}
	baseNode.SetMaxQueueLength(maxQueueLength)
	baseNode.SetMaxParallelism(maxParallelism)

	maxSize := Params.FlushInsertBufferSize
	iBuffer := &insertBuffer{
		insertData: make(map[UniqueID]*InsertData),
		maxSize:    maxSize,
	}

	// MinIO
	option := &miniokv.Option{
		Address:           Params.MinioAddress,
		AccessKeyID:       Params.MinioAccessKeyID,
		SecretAccessKeyID: Params.MinioSecretAccessKey,
		UseSSL:            Params.MinioUseSSL,
		CreateBucket:      true,
		BucketName:        Params.MinioBucketName,
	}

	minIOKV, err := miniokv.NewMinIOKV(ctx, option)
	if err != nil {
		panic(err)
	}

764
	//input stream, data node time tick
Z
zhenshan.cao 已提交
765 766
	wTt, _ := factory.NewMsgStream(ctx)
	wTt.AsProducer([]string{Params.TimeTickChannelName})
X
Xiangyu Wang 已提交
767
	log.Debug("datanode AsProducer: " + Params.TimeTickChannelName)
768
	var wTtMsgStream msgstream.MsgStream = wTt
X
XuanYang-cn 已提交
769
	wTtMsgStream.Start()
770 771

	// update statistics channel
Z
zhenshan.cao 已提交
772 773
	segS, _ := factory.NewMsgStream(ctx)
	segS.AsProducer([]string{Params.SegmentStatisticsChannelName})
X
Xiangyu Wang 已提交
774
	log.Debug("datanode AsProducer: " + Params.SegmentStatisticsChannelName)
775
	var segStatisticsMsgStream msgstream.MsgStream = segS
X
XuanYang-cn 已提交
776
	segStatisticsMsgStream.Start()
X
XuanYang-cn 已提交
777 778

	// segment flush completed channel
Z
zhenshan.cao 已提交
779 780
	cf, _ := factory.NewMsgStream(ctx)
	cf.AsProducer([]string{Params.CompleteFlushChannelName})
X
Xiangyu Wang 已提交
781
	log.Debug("datanode AsProducer: " + Params.CompleteFlushChannelName)
X
XuanYang-cn 已提交
782
	var completeFlushStream msgstream.MsgStream = cf
X
XuanYang-cn 已提交
783
	completeFlushStream.Start()
X
XuanYang-cn 已提交
784 785

	return &insertBufferNode{
X
XuanYang-cn 已提交
786 787 788 789 790
		BaseNode:                baseNode,
		insertBuffer:            iBuffer,
		minIOKV:                 minIOKV,
		timeTickStream:          wTtMsgStream,
		segmentStatisticsStream: segStatisticsMsgStream,
X
XuanYang-cn 已提交
791
		completeFlushStream:     completeFlushStream,
X
XuanYang-cn 已提交
792 793
		replica:                 replica,
		flushMeta:               flushMeta,
794
		flushMap:                sync.Map{},
X
XuanYang-cn 已提交
795 796
	}
}