“1fa7b6a29c61358cc2ca6f64cef4aa0e1a7ca74c”上不存在“git@gitcode.net:openanolis/cloud-kernel.git”
flow_graph_insert_buffer_node.go 23.8 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
XuanYang-cn 已提交
12 13 14 15 16 17
package datanode

import (
	"bytes"
	"context"
	"encoding/binary"
18
	"errors"
X
XuanYang-cn 已提交
19 20
	"path"
	"strconv"
21
	"sync"
X
XuanYang-cn 已提交
22 23
	"unsafe"

X
XuanYang-cn 已提交
24
	"go.uber.org/zap"
X
XuanYang-cn 已提交
25

X
Xiangyu Wang 已提交
26 27 28 29 30 31 32
	"github.com/milvus-io/milvus/internal/kv"
	miniokv "github.com/milvus-io/milvus/internal/kv/minio"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/storage"
	"github.com/milvus-io/milvus/internal/util/flowgraph"
	"github.com/milvus-io/milvus/internal/util/trace"
33
	"github.com/opentracing/opentracing-go"
X
Xiangyu Wang 已提交
34 35

	"github.com/milvus-io/milvus/internal/proto/commonpb"
Y
yangxuan 已提交
36
	"github.com/milvus-io/milvus/internal/proto/datapb"
X
Xiangyu Wang 已提交
37 38 39
	"github.com/milvus-io/milvus/internal/proto/etcdpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
X
XuanYang-cn 已提交
40 41 42 43 44 45 46 47 48 49 50
)

const (
	CollectionPrefix = "/collection/"
	SegmentPrefix    = "/segment/"
)

type (
	InsertData = storage.InsertData
	Blob       = storage.Blob
)
51 52 53 54
type insertBufferNode struct {
	BaseNode
	insertBuffer *insertBuffer
	replica      Replica
55
	idAllocator  allocatorInterface
56 57
	flushMap     sync.Map

G
godchen 已提交
58
	minIOKV kv.BaseKV
59 60 61 62 63 64 65 66 67 68

	timeTickStream          msgstream.MsgStream
	segmentStatisticsStream msgstream.MsgStream
	completeFlushStream     msgstream.MsgStream
}

type insertBuffer struct {
	insertData map[UniqueID]*InsertData // SegmentID to InsertData
	maxSize    int32
}
X
XuanYang-cn 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95

func (ib *insertBuffer) size(segmentID UniqueID) int32 {
	if ib.insertData == nil || len(ib.insertData) <= 0 {
		return 0
	}
	idata, ok := ib.insertData[segmentID]
	if !ok {
		return 0
	}

	var maxSize int32 = 0
	for _, data := range idata.Data {
		fdata, ok := data.(*storage.FloatVectorFieldData)
		if ok && int32(fdata.NumRows) > maxSize {
			maxSize = int32(fdata.NumRows)
		}

		bdata, ok := data.(*storage.BinaryVectorFieldData)
		if ok && int32(bdata.NumRows) > maxSize {
			maxSize = int32(bdata.NumRows)
		}

	}
	return maxSize
}

func (ib *insertBuffer) full(segmentID UniqueID) bool {
S
sunby 已提交
96
	log.Debug("Segment size", zap.Any("segment", segmentID), zap.Int32("size", ib.size(segmentID)), zap.Int32("maxsize", ib.maxSize))
X
XuanYang-cn 已提交
97 98 99 100 101 102 103
	return ib.size(segmentID) >= ib.maxSize
}

func (ibNode *insertBufferNode) Name() string {
	return "ibNode"
}

104
func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
X
XuanYang-cn 已提交
105 106

	if len(in) != 1 {
X
XuanYang-cn 已提交
107
		log.Error("Invalid operate message input in insertBufferNode", zap.Int("input length", len(in)))
X
XuanYang-cn 已提交
108 109 110
		// TODO: add error handling
	}

111
	iMsg, ok := in[0].(*insertMsg)
X
XuanYang-cn 已提交
112
	if !ok {
X
XuanYang-cn 已提交
113
		log.Error("type assertion failed for insertMsg")
X
XuanYang-cn 已提交
114 115 116
		// TODO: add error handling
	}

B
bigsheeper 已提交
117
	if iMsg == nil {
118 119 120 121 122 123 124 125
		return []Msg{}
	}

	var spans []opentracing.Span
	for _, msg := range iMsg.insertMessages {
		sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
		spans = append(spans, sp)
		msg.SetTraceCtx(ctx)
B
bigsheeper 已提交
126 127
	}

X
XuanYang-cn 已提交
128
	// Updating segment statistics
X
XuanYang-cn 已提交
129
	uniqueSeg := make(map[UniqueID]int64)
130
	for _, msg := range iMsg.insertMessages {
131

132
		currentSegID := msg.GetSegmentID()
X
XuanYang-cn 已提交
133 134 135
		collID := msg.GetCollectionID()
		partitionID := msg.GetPartitionID()

136
		if !ibNode.replica.hasSegment(currentSegID) {
X
XuanYang-cn 已提交
137
			err := ibNode.replica.addSegment(currentSegID, collID, partitionID, msg.GetChannelID())
X
XuanYang-cn 已提交
138
			if err != nil {
X
XuanYang-cn 已提交
139
				log.Error("add segment wrong", zap.Error(err))
X
XuanYang-cn 已提交
140
			}
X
XuanYang-cn 已提交
141

142
			// set msg pack start positions
143
			ibNode.replica.setStartPositions(currentSegID, iMsg.startPositions)
X
XuanYang-cn 已提交
144 145
		}

X
XuanYang-cn 已提交
146 147
		segNum := uniqueSeg[currentSegID]
		uniqueSeg[currentSegID] = segNum + int64(len(msg.RowIDs))
148
	}
X
XuanYang-cn 已提交
149

150
	segToUpdate := make([]UniqueID, 0, len(uniqueSeg))
X
XuanYang-cn 已提交
151
	for id, num := range uniqueSeg {
152
		segToUpdate = append(segToUpdate, id)
X
XuanYang-cn 已提交
153 154 155 156 157

		err := ibNode.replica.updateStatistics(id, num)
		if err != nil {
			log.Error("update Segment Row number wrong", zap.Error(err))
		}
158
	}
X
XuanYang-cn 已提交
159

160 161
	if len(segToUpdate) > 0 {
		err := ibNode.updateSegStatistics(segToUpdate)
X
XuanYang-cn 已提交
162 163
		if err != nil {
			log.Error("update segment statistics error", zap.Error(err))
X
XuanYang-cn 已提交
164
		}
165 166
	}

X
XuanYang-cn 已提交
167 168 169 170
	// iMsg is insertMsg
	// 1. iMsg -> buffer
	for _, msg := range iMsg.insertMessages {
		if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
X
XuanYang-cn 已提交
171
			log.Error("misaligned messages detected")
X
XuanYang-cn 已提交
172 173 174
			continue
		}
		currentSegID := msg.GetSegmentID()
X
XuanYang-cn 已提交
175
		collectionID := msg.GetCollectionID()
X
XuanYang-cn 已提交
176 177 178 179 180 181 182 183

		idata, ok := ibNode.insertBuffer.insertData[currentSegID]
		if !ok {
			idata = &InsertData{
				Data: make(map[UniqueID]storage.FieldData),
			}
		}

X
XuanYang-cn 已提交
184 185
		// 1.1 Get CollectionMeta
		collection, err := ibNode.replica.getCollectionByID(collectionID)
X
XuanYang-cn 已提交
186 187
		if err != nil {
			// GOOSE TODO add error handler
X
XuanYang-cn 已提交
188
			log.Error("Get meta wrong:", zap.Error(err))
X
XuanYang-cn 已提交
189 190 191 192 193 194
			continue
		}

		collSchema := collection.schema
		// 1.2 Get Fields
		var pos int = 0 // Record position of blob
195 196 197 198 199 200 201 202 203 204 205
		log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("Fields", collSchema.Fields))
		var fieldIDs []int64
		var fieldTypes []schemapb.DataType
		for _, field := range collSchema.Fields {
			fieldIDs = append(fieldIDs, field.FieldID)
			fieldTypes = append(fieldTypes, field.DataType)
		}

		log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("FieldIDs", fieldIDs))
		log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("fieldTypes", fieldTypes))

X
XuanYang-cn 已提交
206 207
		for _, field := range collSchema.Fields {
			switch field.DataType {
G
godchen 已提交
208
			case schemapb.DataType_FloatVector:
X
XuanYang-cn 已提交
209 210 211 212 213
				var dim int
				for _, t := range field.TypeParams {
					if t.Key == "dim" {
						dim, err = strconv.Atoi(t.Value)
						if err != nil {
X
XuanYang-cn 已提交
214
							log.Error("strconv wrong")
X
XuanYang-cn 已提交
215 216 217 218 219
						}
						break
					}
				}
				if dim <= 0 {
X
XuanYang-cn 已提交
220 221
					log.Error("invalid dim")
					continue
X
XuanYang-cn 已提交
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
					// TODO: add error handling
				}

				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.FloatVectorFieldData{
						NumRows: 0,
						Data:    make([]float32, 0),
						Dim:     dim,
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData)

				var offset int
				for _, blob := range msg.RowData {
					offset = 0
					for j := 0; j < dim; j++ {
						var v float32
						buf := bytes.NewBuffer(blob.GetValue()[pos+offset:])
						if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
242
							log.Error("binary.read float32 wrong", zap.Error(err))
X
XuanYang-cn 已提交
243 244 245 246 247 248 249 250
						}
						fieldData.Data = append(fieldData.Data, v)
						offset += int(unsafe.Sizeof(*(&v)))
					}
				}
				pos += offset
				fieldData.NumRows += len(msg.RowIDs)

G
godchen 已提交
251
			case schemapb.DataType_BinaryVector:
X
XuanYang-cn 已提交
252 253 254 255 256
				var dim int
				for _, t := range field.TypeParams {
					if t.Key == "dim" {
						dim, err = strconv.Atoi(t.Value)
						if err != nil {
X
XuanYang-cn 已提交
257
							log.Error("strconv wrong")
X
XuanYang-cn 已提交
258 259 260 261 262
						}
						break
					}
				}
				if dim <= 0 {
X
XuanYang-cn 已提交
263
					log.Error("invalid dim")
X
XuanYang-cn 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276 277
					// TODO: add error handling
				}

				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{
						NumRows: 0,
						Data:    make([]byte, 0),
						Dim:     dim,
					}
				}
				fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)

				var offset int
				for _, blob := range msg.RowData {
X
xige-16 已提交
278
					bv := blob.GetValue()[pos : pos+(dim/8)]
X
XuanYang-cn 已提交
279 280 281 282 283 284
					fieldData.Data = append(fieldData.Data, bv...)
					offset = len(bv)
				}
				pos += offset
				fieldData.NumRows += len(msg.RowData)

G
godchen 已提交
285
			case schemapb.DataType_Bool:
X
XuanYang-cn 已提交
286 287 288 289 290 291 292 293 294 295 296 297
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.BoolFieldData{
						NumRows: 0,
						Data:    make([]bool, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
				var v bool
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
298
						log.Error("binary.Read bool wrong", zap.Error(err))
X
XuanYang-cn 已提交
299 300 301 302 303 304 305
					}
					fieldData.Data = append(fieldData.Data, v)

				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

G
godchen 已提交
306
			case schemapb.DataType_Int8:
X
XuanYang-cn 已提交
307 308 309 310 311 312 313 314 315 316 317 318
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int8FieldData{
						NumRows: 0,
						Data:    make([]int8, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
				var v int8
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
319
						log.Error("binary.Read int8 wrong", zap.Error(err))
X
XuanYang-cn 已提交
320 321 322 323 324 325
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

G
godchen 已提交
326
			case schemapb.DataType_Int16:
X
XuanYang-cn 已提交
327 328 329 330 331 332 333 334 335 336 337 338
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int16FieldData{
						NumRows: 0,
						Data:    make([]int16, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
				var v int16
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
339
						log.Error("binary.Read int16 wrong", zap.Error(err))
X
XuanYang-cn 已提交
340 341 342 343 344 345
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

G
godchen 已提交
346
			case schemapb.DataType_Int32:
X
XuanYang-cn 已提交
347 348 349 350 351 352 353 354 355 356 357 358
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int32FieldData{
						NumRows: 0,
						Data:    make([]int32, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
				var v int32
				for _, blob := range msg.RowData {
					buf := bytes.NewReader(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
359
						log.Error("binary.Read int32 wrong", zap.Error(err))
X
XuanYang-cn 已提交
360 361 362 363 364 365
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

G
godchen 已提交
366
			case schemapb.DataType_Int64:
X
XuanYang-cn 已提交
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.Int64FieldData{
						NumRows: 0,
						Data:    make([]int64, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData)
				switch field.FieldID {
				case 0: // rowIDs
					fieldData.Data = append(fieldData.Data, msg.RowIDs...)
					fieldData.NumRows += len(msg.RowIDs)
				case 1: // Timestamps
					for _, ts := range msg.Timestamps {
						fieldData.Data = append(fieldData.Data, int64(ts))
					}
					fieldData.NumRows += len(msg.Timestamps)
				default:
					var v int64
					for _, blob := range msg.RowData {
						buf := bytes.NewBuffer(blob.GetValue()[pos:])
						if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
389
							log.Error("binary.Read int64 wrong", zap.Error(err))
X
XuanYang-cn 已提交
390 391 392 393 394 395 396
						}
						fieldData.Data = append(fieldData.Data, v)
					}
					pos += int(unsafe.Sizeof(*(&v)))
					fieldData.NumRows += len(msg.RowIDs)
				}

G
godchen 已提交
397
			case schemapb.DataType_Float:
X
XuanYang-cn 已提交
398 399 400 401 402 403 404 405 406 407 408 409
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.FloatFieldData{
						NumRows: 0,
						Data:    make([]float32, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
				var v float32
				for _, blob := range msg.RowData {
					buf := bytes.NewBuffer(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
410
						log.Error("binary.Read float32 wrong", zap.Error(err))
X
XuanYang-cn 已提交
411 412 413 414 415 416
					}
					fieldData.Data = append(fieldData.Data, v)
				}
				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)

G
godchen 已提交
417
			case schemapb.DataType_Double:
X
XuanYang-cn 已提交
418 419 420 421 422 423 424 425 426 427 428 429
				if _, ok := idata.Data[field.FieldID]; !ok {
					idata.Data[field.FieldID] = &storage.DoubleFieldData{
						NumRows: 0,
						Data:    make([]float64, 0),
					}
				}

				fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
				var v float64
				for _, blob := range msg.RowData {
					buf := bytes.NewBuffer(blob.GetValue()[pos:])
					if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
X
XuanYang-cn 已提交
430
						log.Error("binary.Read float64 wrong", zap.Error(err))
X
XuanYang-cn 已提交
431 432 433 434 435 436 437 438 439 440 441 442
					}
					fieldData.Data = append(fieldData.Data, v)
				}

				pos += int(unsafe.Sizeof(*(&v)))
				fieldData.NumRows += len(msg.RowIDs)
			}
		}

		// 1.3 store in buffer
		ibNode.insertBuffer.insertData[currentSegID] = idata

443
		// store current endPositions as Segment->EndPostion
444
		ibNode.replica.setEndPositions(currentSegID, iMsg.endPositions)
X
XuanYang-cn 已提交
445 446 447
	}

	if len(iMsg.insertMessages) > 0 {
X
XuanYang-cn 已提交
448
		log.Debug("---insert buffer status---")
X
XuanYang-cn 已提交
449 450 451
		var stopSign int = 0
		for k := range ibNode.insertBuffer.insertData {
			if stopSign >= 10 {
X
XuanYang-cn 已提交
452
				log.Debug("......")
X
XuanYang-cn 已提交
453 454
				break
			}
X
XuanYang-cn 已提交
455
			log.Debug("seg buffer status", zap.Int64("segmentID", k), zap.Int32("buffer size", ibNode.insertBuffer.size(k)))
X
XuanYang-cn 已提交
456 457 458 459
			stopSign++
		}
	}

S
sunby 已提交
460 461 462 463 464
	for _, segToFlush := range segToUpdate {
		// If full, auto flush
		if ibNode.insertBuffer.full(segToFlush) {
			log.Debug(". Insert Buffer full, auto flushing ",
				zap.Int32("num of rows", ibNode.insertBuffer.size(segToFlush)))
X
XuanYang-cn 已提交
465

S
sunby 已提交
466 467 468 469 470
			collMeta, err := ibNode.getCollMetabySegID(segToFlush)
			if err != nil {
				log.Error("Auto flush failed .. cannot get collection meta ..", zap.Error(err))
				continue
			}
471

S
sunby 已提交
472 473 474 475 476 477
			ibNode.flushMap.Store(segToFlush, ibNode.insertBuffer.insertData[segToFlush])
			delete(ibNode.insertBuffer.insertData, segToFlush)

			collID, partitionID, err := ibNode.getCollectionandPartitionIDbySegID(segToFlush)
			if err != nil {
				log.Error("Auto flush failed .. cannot get collection ID or partition ID..", zap.Error(err))
478
				continue
X
XuanYang-cn 已提交
479
			}
480

S
sunby 已提交
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500
			finishCh := make(chan map[UniqueID]string)
			go flushSegment(collMeta, segToFlush, partitionID, collID,
				&ibNode.flushMap, ibNode.minIOKV, finishCh, ibNode.idAllocator)
			go ibNode.bufferAutoFlushPaths(finishCh, segToFlush)
		}
	}

	// iMsg is Flush() msg from dataservice
	//   1. insertBuffer(not empty) -> binLogs -> minIO/S3
	if iMsg.flushMessage != nil && ibNode.replica.hasSegment(iMsg.flushMessage.segmentID) {
		currentSegID := iMsg.flushMessage.segmentID
		log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID))

		finishCh := make(chan map[UniqueID]string)
		go ibNode.completeFlush(currentSegID, finishCh, iMsg.flushMessage.dmlFlushedCh)

		if ibNode.insertBuffer.size(currentSegID) <= 0 {
			log.Debug(".. Buffer empty ...")
			finishCh <- make(map[UniqueID]string)
		} else {
501 502 503
			log.Debug(".. Buffer not empty, flushing ..")
			ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID])
			delete(ibNode.insertBuffer.insertData, currentSegID)
X
XuanYang-cn 已提交
504
			clearFn := func() {
505
				finishCh <- nil
X
XuanYang-cn 已提交
506 507 508
				log.Debug(".. Clearing flush Buffer ..")
				ibNode.flushMap.Delete(currentSegID)
			}
509

S
sunby 已提交
510 511
			var collMeta *etcdpb.CollectionMeta
			var collSch *schemapb.CollectionSchema
512 513 514
			seg, err := ibNode.replica.getSegmentByID(currentSegID)
			if err != nil {
				log.Error("Flush failed .. cannot get segment ..", zap.Error(err))
X
XuanYang-cn 已提交
515
				clearFn()
S
sunby 已提交
516
				// TODO add error handling
517 518
			}

S
sunby 已提交
519
			collSch, err = ibNode.getCollectionSchemaByID(seg.collectionID)
520
			if err != nil {
521
				log.Error("Flush failed .. cannot get collection schema ..", zap.Error(err))
X
XuanYang-cn 已提交
522
				clearFn()
S
sunby 已提交
523
				// TODO add error handling
524
			}
525

S
sunby 已提交
526
			collMeta = &etcdpb.CollectionMeta{
527 528 529 530
				Schema: collSch,
				ID:     seg.collectionID,
			}

531 532 533 534
			go flushSegment(collMeta, currentSegID, seg.partitionID, seg.collectionID,
				&ibNode.flushMap, ibNode.minIOKV, finishCh, ibNode.idAllocator)
		}

X
XuanYang-cn 已提交
535 536 537
	}

	if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil {
X
XuanYang-cn 已提交
538
		log.Error("send hard time tick into pulsar channel failed", zap.Error(err))
X
XuanYang-cn 已提交
539 540 541 542 543 544
	}

	var res Msg = &gcMsg{
		gcRecord:  iMsg.gcRecord,
		timeRange: iMsg.timeRange,
	}
545 546 547
	for _, sp := range spans {
		sp.Finish()
	}
X
XuanYang-cn 已提交
548

549
	return []Msg{res}
X
XuanYang-cn 已提交
550 551
}

552 553
func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID UniqueID,
	insertData *sync.Map, kv kv.BaseKV, field2PathCh chan<- map[UniqueID]string, idAllocator allocatorInterface) {
X
XuanYang-cn 已提交
554

X
XuanYang-cn 已提交
555
	clearFn := func(isSuccess bool) {
556 557 558 559
		if !isSuccess {
			field2PathCh <- nil
		}

560 561
		log.Debug(".. Clearing flush Buffer ..")
		insertData.Delete(segID)
X
XuanYang-cn 已提交
562
	}
X
XuanYang-cn 已提交
563 564 565 566

	inCodec := storage.NewInsertCodec(collMeta)

	// buffer data to binlogs
567 568 569
	data, ok := insertData.Load(segID)
	if !ok {
		log.Error("Flush failed ... cannot load insertData ..")
X
XuanYang-cn 已提交
570
		clearFn(false)
571 572
		return
	}
X
XuanYang-cn 已提交
573

574
	binLogs, statsBinlogs, err := inCodec.Serialize(partitionID, segID, data.(*InsertData))
X
XuanYang-cn 已提交
575
	if err != nil {
576
		log.Error("Flush failed ... cannot generate binlog ..", zap.Error(err))
X
XuanYang-cn 已提交
577
		clearFn(false)
578
		return
X
XuanYang-cn 已提交
579 580
	}

581 582 583 584
	log.Debug(".. Saving binlogs to MinIO ..", zap.Int("number", len(binLogs)))
	field2Path := make(map[UniqueID]string, len(binLogs))
	kvs := make(map[string]string, len(binLogs))
	paths := make([]string, 0, len(binLogs))
585 586 587
	field2Logidx := make(map[UniqueID]UniqueID, len(binLogs))

	// write insert binlog
588 589
	for _, blob := range binLogs {
		fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
X
XuanYang-cn 已提交
590
		if err != nil {
591
			log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
X
XuanYang-cn 已提交
592
			clearFn(false)
593
			return
X
XuanYang-cn 已提交
594 595
		}

596
		logidx, err := idAllocator.allocID()
X
XuanYang-cn 已提交
597
		if err != nil {
598
			log.Error("Flush failed ... cannot alloc ID ..", zap.Error(err))
X
XuanYang-cn 已提交
599
			clearFn(false)
600
			return
X
XuanYang-cn 已提交
601 602
		}

603 604 605
		// no error raise if alloc=false
		k, _ := idAllocator.genKey(false, collID, partitionID, segID, fieldID, logidx)

606 607 608 609
		key := path.Join(Params.InsertBinlogRootPath, k)
		paths = append(paths, key)
		kvs[key] = string(blob.Value[:])
		field2Path[fieldID] = key
610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628
		field2Logidx[fieldID] = logidx
	}

	// write stats binlog
	for _, blob := range statsBinlogs {
		fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
		if err != nil {
			log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
			clearFn(false)
			return
		}

		logidx := field2Logidx[fieldID]

		// no error raise if alloc=false
		k, _ := idAllocator.genKey(false, collID, partitionID, segID, fieldID, logidx)

		key := path.Join(Params.StatsBinlogRootPath, k)
		kvs[key] = string(blob.Value[:])
629 630 631 632 633 634
	}

	err = kv.MultiSave(kvs)
	if err != nil {
		log.Error("Flush failed ... cannot save to MinIO ..", zap.Error(err))
		_ = kv.MultiRemove(paths)
X
XuanYang-cn 已提交
635
		clearFn(false)
636 637
		return
	}
X
XuanYang-cn 已提交
638

639 640 641 642 643 644 645 646
	field2PathCh <- field2Path
	clearFn(true)
}

func (ibNode *insertBufferNode) bufferAutoFlushPaths(wait <-chan map[UniqueID]string, segID UniqueID) error {
	field2Path := <-wait
	if field2Path == nil {
		return errors.New("Nil field2Path")
X
XuanYang-cn 已提交
647
	}
648

649
	return ibNode.replica.bufferAutoFlushBinlogPaths(segID, field2Path)
X
XuanYang-cn 已提交
650 651
}

Y
yangxuan 已提交
652
func (ibNode *insertBufferNode) completeFlush(segID UniqueID, wait <-chan map[UniqueID]string, dmlFlushedCh chan<- []*datapb.ID2PathList) {
653 654 655
	field2Path := <-wait

	if field2Path == nil {
Y
yangxuan 已提交
656
		dmlFlushedCh <- nil
657 658 659 660 661 662 663
		return
	}

	ibNode.replica.bufferAutoFlushBinlogPaths(segID, field2Path)
	bufferField2Paths, err := ibNode.replica.getBufferPaths(segID)
	if err != nil {
		log.Error("Flush failed ... cannot get buffered paths", zap.Error(err))
Y
yangxuan 已提交
664
		dmlFlushedCh <- nil
665 666
	}

Y
yangxuan 已提交
667 668 669 670 671 672 673 674 675 676 677
	binlogPaths := make([]*datapb.ID2PathList, 0, len(bufferField2Paths))
	for k, paths := range bufferField2Paths {

		binlogPaths = append(binlogPaths, &datapb.ID2PathList{
			ID:    k,
			Paths: paths,
		})
	}

	dmlFlushedCh <- binlogPaths

678
	log.Debug(".. Segment flush completed ..")
X
XuanYang-cn 已提交
679 680
	ibNode.updateSegStatistics([]UniqueID{segID})

X
XuanYang-cn 已提交
681 682
}

X
XuanYang-cn 已提交
683 684 685 686 687 688 689 690
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
	msgPack := msgstream.MsgPack{}
	timeTickMsg := msgstream.TimeTickMsg{
		BaseMsg: msgstream.BaseMsg{
			BeginTimestamp: ts,
			EndTimestamp:   ts,
			HashValues:     []uint32{0},
		},
G
godchen 已提交
691
		TimeTickMsg: internalpb.TimeTickMsg{
X
XuanYang-cn 已提交
692
			Base: &commonpb.MsgBase{
693
				MsgType:   commonpb.MsgType_TimeTick,
X
XuanYang-cn 已提交
694 695
				MsgID:     0,  // GOOSE TODO
				Timestamp: ts, // GOOSE TODO
X
XuanYang-cn 已提交
696
				SourceID:  Params.NodeID,
X
XuanYang-cn 已提交
697 698 699 700
			},
		},
	}
	msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
701
	return ibNode.timeTickStream.Produce(&msgPack)
X
XuanYang-cn 已提交
702 703
}

X
XuanYang-cn 已提交
704
func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error {
X
XuanYang-cn 已提交
705
	log.Debug("Updating segments statistics...")
G
godchen 已提交
706
	statsUpdates := make([]*internalpb.SegmentStatisticsUpdates, 0, len(segIDs))
707 708 709
	for _, segID := range segIDs {
		updates, err := ibNode.replica.getSegmentStatisticsUpdates(segID)
		if err != nil {
X
XuanYang-cn 已提交
710
			log.Error("get segment statistics updates wrong", zap.Int64("segmentID", segID), zap.Error(err))
711 712 713 714 715
			continue
		}
		statsUpdates = append(statsUpdates, updates)
	}

G
godchen 已提交
716
	segStats := internalpb.SegmentStatistics{
717
		Base: &commonpb.MsgBase{
718
			MsgType:   commonpb.MsgType_SegmentStatistics,
719 720
			MsgID:     UniqueID(0),  // GOOSE TODO
			Timestamp: Timestamp(0), // GOOSE TODO
X
XuanYang-cn 已提交
721
			SourceID:  Params.NodeID,
722 723 724 725 726 727
		},
		SegStats: statsUpdates,
	}

	var msg msgstream.TsMsg = &msgstream.SegmentStatisticsMsg{
		BaseMsg: msgstream.BaseMsg{
X
XuanYang-cn 已提交
728
			HashValues: []uint32{0}, // GOOSE TODO
729 730 731 732 733 734 735
		},
		SegmentStatistics: segStats,
	}

	var msgPack = msgstream.MsgPack{
		Msgs: []msgstream.TsMsg{msg},
	}
736
	return ibNode.segmentStatisticsStream.Produce(&msgPack)
737 738 739 740 741 742 743 744 745 746
}

func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (*schemapb.CollectionSchema, error) {
	ret, err := ibNode.replica.getCollectionByID(collectionID)
	if err != nil {
		return nil, err
	}
	return ret.schema, nil
}

747 748 749 750 751
func (ibNode *insertBufferNode) getCollMetabySegID(segmentID UniqueID) (meta *etcdpb.CollectionMeta, err error) {
	ret, err := ibNode.replica.getSegmentByID(segmentID)
	if err != nil {
		return
	}
752
	meta = &etcdpb.CollectionMeta{}
753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772
	meta.ID = ret.collectionID

	coll, err := ibNode.replica.getCollectionByID(ret.collectionID)
	if err != nil {
		return
	}
	meta.Schema = coll.GetSchema()
	return
}

func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID UniqueID) (collID, partitionID UniqueID, err error) {
	seg, err := ibNode.replica.getSegmentByID(segmentID)
	if err != nil {
		return
	}
	collID = seg.collectionID
	partitionID = seg.partitionID
	return
}

773
func newInsertBufferNode(ctx context.Context, replica Replica, factory msgstream.Factory, idAllocator allocatorInterface) *insertBufferNode {
X
XuanYang-cn 已提交
774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801
	maxQueueLength := Params.FlowGraphMaxQueueLength
	maxParallelism := Params.FlowGraphMaxParallelism

	baseNode := BaseNode{}
	baseNode.SetMaxQueueLength(maxQueueLength)
	baseNode.SetMaxParallelism(maxParallelism)

	maxSize := Params.FlushInsertBufferSize
	iBuffer := &insertBuffer{
		insertData: make(map[UniqueID]*InsertData),
		maxSize:    maxSize,
	}

	// MinIO
	option := &miniokv.Option{
		Address:           Params.MinioAddress,
		AccessKeyID:       Params.MinioAccessKeyID,
		SecretAccessKeyID: Params.MinioSecretAccessKey,
		UseSSL:            Params.MinioUseSSL,
		CreateBucket:      true,
		BucketName:        Params.MinioBucketName,
	}

	minIOKV, err := miniokv.NewMinIOKV(ctx, option)
	if err != nil {
		panic(err)
	}

802
	//input stream, data node time tick
Z
zhenshan.cao 已提交
803 804
	wTt, _ := factory.NewMsgStream(ctx)
	wTt.AsProducer([]string{Params.TimeTickChannelName})
X
Xiangyu Wang 已提交
805
	log.Debug("datanode AsProducer: " + Params.TimeTickChannelName)
806
	var wTtMsgStream msgstream.MsgStream = wTt
X
XuanYang-cn 已提交
807
	wTtMsgStream.Start()
808 809

	// update statistics channel
Z
zhenshan.cao 已提交
810 811
	segS, _ := factory.NewMsgStream(ctx)
	segS.AsProducer([]string{Params.SegmentStatisticsChannelName})
X
Xiangyu Wang 已提交
812
	log.Debug("datanode AsProducer: " + Params.SegmentStatisticsChannelName)
813
	var segStatisticsMsgStream msgstream.MsgStream = segS
X
XuanYang-cn 已提交
814
	segStatisticsMsgStream.Start()
X
XuanYang-cn 已提交
815 816

	// segment flush completed channel
Z
zhenshan.cao 已提交
817 818
	cf, _ := factory.NewMsgStream(ctx)
	cf.AsProducer([]string{Params.CompleteFlushChannelName})
X
Xiangyu Wang 已提交
819
	log.Debug("datanode AsProducer: " + Params.CompleteFlushChannelName)
X
XuanYang-cn 已提交
820
	var completeFlushStream msgstream.MsgStream = cf
X
XuanYang-cn 已提交
821
	completeFlushStream.Start()
X
XuanYang-cn 已提交
822 823

	return &insertBufferNode{
X
XuanYang-cn 已提交
824 825 826 827 828
		BaseNode:                baseNode,
		insertBuffer:            iBuffer,
		minIOKV:                 minIOKV,
		timeTickStream:          wTtMsgStream,
		segmentStatisticsStream: segStatisticsMsgStream,
X
XuanYang-cn 已提交
829
		completeFlushStream:     completeFlushStream,
X
XuanYang-cn 已提交
830
		replica:                 replica,
831 832
		flushMap:                sync.Map{},
		idAllocator:             idAllocator,
X
XuanYang-cn 已提交
833 834
	}
}