import_wrapper.go 22.2 KB
Newer Older
G
groot 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

G
groot 已提交
17 18 19 20 21
package importutil

import (
	"bufio"
	"context"
G
groot 已提交
22
	"fmt"
G
groot 已提交
23
	"strconv"
G
groot 已提交
24

G
godchen 已提交
25 26
	"go.uber.org/zap"

27
	"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
G
groot 已提交
28 29
	"github.com/milvus-io/milvus/internal/allocator"
	"github.com/milvus-io/milvus/internal/log"
G
groot 已提交
30
	"github.com/milvus-io/milvus/internal/proto/datapb"
G
groot 已提交
31
	"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
32
	"github.com/milvus-io/milvus/internal/querycoordv2/params"
G
groot 已提交
33
	"github.com/milvus-io/milvus/internal/storage"
34
	"github.com/milvus-io/milvus/internal/util/retry"
G
groot 已提交
35
	"github.com/milvus-io/milvus/internal/util/timerecord"
G
groot 已提交
36 37 38 39 40
)

const (
	JSONFileExt  = ".json"
	NumpyFileExt = ".npy"
G
groot 已提交
41

G
groot 已提交
42 43 44
	// supposed size of a single block, to control a binlog file size, the max biglog file size is no more than 2*SingleBlockSize
	SingleBlockSize = 16 * 1024 * 1024 // 16MB

G
groot 已提交
45 46 47 48 49 50 51 52 53
	// this limitation is to avoid this OOM risk:
	// simetimes system segment max size is a large number, a single segment fields data might cause OOM.
	// flush the segment when its data reach this limitation, let the compaction to compact it later.
	MaxSegmentSizeInMemory = 512 * 1024 * 1024 // 512MB

	// this limitation is to avoid this OOM risk:
	// if the shard number is a large number, although single segment size is small, but there are lot of in-memory segments,
	// the total memory size might cause OOM.
	MaxTotalSizeInMemory = 2 * 1024 * 1024 * 1024 // 2GB
G
groot 已提交
54

G
groot 已提交
55 56 57
	// progress percent value of persist state
	ProgressValueForPersist = 90

G
groot 已提交
58 59 60 61 62 63
	// keywords of import task informations
	FailedReason    = "failed_reason"
	Files           = "files"
	CollectionName  = "collection"
	PartitionName   = "partition"
	PersistTimeCost = "persist_cost"
G
groot 已提交
64
	ProgressPercent = "progress_percent"
G
groot 已提交
65 66
)

67 68 69
// ReportImportAttempts is the maximum # of attempts to retry when import fails.
var ReportImportAttempts uint = 10

70 71 72 73 74
type ImportFlushFunc func(fields BlockData, shardID int, partID int64) error
type AssignSegmentFunc func(shardID int, partID int64) (int64, string, error)
type CreateBinlogsFunc func(fields BlockData, segmentID int64, partID int64) ([]*datapb.FieldBinlog, []*datapb.FieldBinlog, error)
type SaveSegmentFunc func(fieldsInsert []*datapb.FieldBinlog, fieldsStats []*datapb.FieldBinlog, segmentID int64, targetChName string, rowCount int64, partID int64) error
type ReportFunc func(res *rootcoordpb.ImportResult) error
G
groot 已提交
75 76 77

type WorkingSegment struct {
	segmentID    int64                 // segment ID
78 79
	shardID      int                   // shard ID
	partitionID  int64                 // partition ID
G
groot 已提交
80 81 82 83 84 85 86
	targetChName string                // target dml channel
	rowCount     int64                 // accumulate row count
	memSize      int                   // total memory size of all binlogs
	fieldsInsert []*datapb.FieldBinlog // persisted binlogs
	fieldsStats  []*datapb.FieldBinlog // stats of persisted binlogs
}

G
groot 已提交
87
type ImportWrapper struct {
88 89 90 91 92 93
	ctx            context.Context        // for canceling parse process
	cancel         context.CancelFunc     // for canceling parse process
	collectionInfo *CollectionInfo        // collection details including schema
	segmentSize    int64                  // maximum size of a segment(unit:byte) defined by dataCoord.segment.maxSize (milvus.yml)
	rowIDAllocator *allocator.IDAllocator // autoid allocator
	chunkManager   storage.ChunkManager
G
groot 已提交
94

G
groot 已提交
95 96 97
	assignSegmentFunc AssignSegmentFunc // function to prepare a new segment
	createBinlogsFunc CreateBinlogsFunc // function to create binlog for a segment
	saveSegmentFunc   SaveSegmentFunc   // function to persist a segment
G
groot 已提交
98

99 100 101
	importResult         *rootcoordpb.ImportResult // import result
	reportFunc           ReportFunc                // report import state to rootcoord
	reportImportAttempts uint                      // attempts count if report function get error
G
groot 已提交
102

103 104
	workingSegments map[int]map[int64]*WorkingSegment // two-level map shard id and partition id to working segments
	progressPercent int64                             // working progress percent
G
groot 已提交
105 106
}

107
func NewImportWrapper(ctx context.Context, collectionInfo *CollectionInfo, segmentSize int64,
G
groot 已提交
108 109
	idAlloc *allocator.IDAllocator, cm storage.ChunkManager, importResult *rootcoordpb.ImportResult,
	reportFunc func(res *rootcoordpb.ImportResult) error) *ImportWrapper {
110 111
	if collectionInfo == nil || collectionInfo.Schema == nil {
		log.Warn("import wrapper: collection schema is nil")
G
groot 已提交
112 113
		return nil
	}
114 115
	log.Info("import wrapper: collection info", zap.Int32("ShardNum", collectionInfo.ShardNum),
		zap.Int("PartitionsNum", len(collectionInfo.PartitionIDs)), zap.Any("Fields", collectionInfo.Name2FieldID))
G
groot 已提交
116

117
	params.Params.InitOnce()
G
groot 已提交
118 119 120
	ctx, cancel := context.WithCancel(ctx)

	wrapper := &ImportWrapper{
G
groot 已提交
121 122
		ctx:                  ctx,
		cancel:               cancel,
123
		collectionInfo:       collectionInfo,
G
groot 已提交
124 125 126 127 128 129
		segmentSize:          segmentSize,
		rowIDAllocator:       idAlloc,
		chunkManager:         cm,
		importResult:         importResult,
		reportFunc:           reportFunc,
		reportImportAttempts: ReportImportAttempts,
130
		workingSegments:      make(map[int]map[int64]*WorkingSegment),
G
groot 已提交
131 132 133 134 135
	}

	return wrapper
}

G
groot 已提交
136 137
func (p *ImportWrapper) SetCallbackFunctions(assignSegmentFunc AssignSegmentFunc, createBinlogsFunc CreateBinlogsFunc, saveSegmentFunc SaveSegmentFunc) error {
	if assignSegmentFunc == nil {
138
		log.Warn("import wrapper: callback function AssignSegmentFunc is nil")
G
groot 已提交
139
		return fmt.Errorf("callback function AssignSegmentFunc is nil")
G
groot 已提交
140 141 142
	}

	if createBinlogsFunc == nil {
143
		log.Warn("import wrapper: callback function CreateBinlogsFunc is nil")
G
groot 已提交
144
		return fmt.Errorf("callback function CreateBinlogsFunc is nil")
G
groot 已提交
145 146 147
	}

	if saveSegmentFunc == nil {
148
		log.Warn("import wrapper: callback function SaveSegmentFunc is nil")
G
groot 已提交
149
		return fmt.Errorf("callback function SaveSegmentFunc is nil")
G
groot 已提交
150 151 152 153 154 155 156 157 158
	}

	p.assignSegmentFunc = assignSegmentFunc
	p.createBinlogsFunc = createBinlogsFunc
	p.saveSegmentFunc = saveSegmentFunc
	return nil
}

// Cancel method can be used to cancel parse process
G
groot 已提交
159 160 161 162 163
func (p *ImportWrapper) Cancel() error {
	p.cancel()
	return nil
}

G
groot 已提交
164 165 166 167
// fileValidation verify the input paths
// if all the files are json type, return true
// if all the files are numpy type, return false, and not allow duplicate file name
func (p *ImportWrapper) fileValidation(filePaths []string) (bool, error) {
168 169
	// use this map to check duplicate file name(only for numpy file)
	fileNames := make(map[string]struct{})
G
groot 已提交
170

G
groot 已提交
171
	totalSize := int64(0)
G
groot 已提交
172
	rowBased := false
G
groot 已提交
173 174
	for i := 0; i < len(filePaths); i++ {
		filePath := filePaths[i]
G
groot 已提交
175 176 177 178
		name, fileType := GetFileNameAndExt(filePath)

		// only allow json file or numpy file
		if fileType != JSONFileExt && fileType != NumpyFileExt {
179
			log.Warn("import wrapper: unsupported file type", zap.String("filePath", filePath))
G
groot 已提交
180
			return false, fmt.Errorf("unsupported file type: '%s'", filePath)
G
groot 已提交
181 182 183 184 185
		}

		// we use the first file to determine row-based or column-based
		if i == 0 && fileType == JSONFileExt {
			rowBased = true
G
groot 已提交
186
		}
G
groot 已提交
187 188

		// check file type
G
groot 已提交
189
		// row-based only support json type, column-based only support numpy type
G
groot 已提交
190 191
		if rowBased {
			if fileType != JSONFileExt {
192
				log.Warn("import wrapper: unsupported file type for row-based mode", zap.String("filePath", filePath))
G
groot 已提交
193
				return rowBased, fmt.Errorf("unsupported file type for row-based mode: '%s'", filePath)
G
groot 已提交
194 195
			}
		} else {
G
groot 已提交
196
			if fileType != NumpyFileExt {
197
				log.Warn("import wrapper: unsupported file type for column-based mode", zap.String("filePath", filePath))
G
groot 已提交
198
				return rowBased, fmt.Errorf("unsupported file type for column-based mode: '%s'", filePath)
G
groot 已提交
199 200 201
			}
		}

G
groot 已提交
202 203 204
		// check dupliate file
		_, ok := fileNames[name]
		if ok {
205
			log.Warn("import wrapper: duplicate file name", zap.String("filePath", filePath))
G
groot 已提交
206
			return rowBased, fmt.Errorf("duplicate file: '%s'", filePath)
G
groot 已提交
207 208 209
		}
		fileNames[name] = struct{}{}

G
groot 已提交
210
		// check file size, single file size cannot exceed MaxFileSize
211
		size, err := p.chunkManager.Size(p.ctx, filePath)
G
groot 已提交
212
		if err != nil {
213
			log.Warn("import wrapper: failed to get file size", zap.String("filePath", filePath), zap.Error(err))
G
groot 已提交
214
			return rowBased, fmt.Errorf("failed to get file size of '%s', error:%w", filePath, err)
G
groot 已提交
215 216
		}

G
groot 已提交
217
		// empty file
G
groot 已提交
218
		if size == 0 {
219
			log.Warn("import wrapper: file size is zero", zap.String("filePath", filePath))
G
groot 已提交
220
			return rowBased, fmt.Errorf("the file '%s' size is zero", filePath)
G
groot 已提交
221
		}
G
groot 已提交
222

223
		if size > params.Params.CommonCfg.ImportMaxFileSize {
224
			log.Warn("import wrapper: file size exceeds the maximum size", zap.String("filePath", filePath),
225 226
				zap.Int64("fileSize", size), zap.Int64("MaxFileSize", params.Params.CommonCfg.ImportMaxFileSize))
			return rowBased, fmt.Errorf("the file '%s' size exceeds the maximum size: %d bytes", filePath, params.Params.CommonCfg.ImportMaxFileSize)
G
groot 已提交
227
		}
G
groot 已提交
228 229 230
		totalSize += size
	}

G
groot 已提交
231
	return rowBased, nil
G
groot 已提交
232 233
}

G
groot 已提交
234
// Import is the entry of import operation
G
groot 已提交
235
// filePath and rowBased are from ImportTask
G
groot 已提交
236 237 238
// if onlyValidate is true, this process only do validation, no data generated, flushFunc will not be called
func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error {
	log.Info("import wrapper: begin import", zap.Any("filePaths", filePaths), zap.Any("options", options))
G
groot 已提交
239

G
groot 已提交
240 241
	// data restore function to import milvus native binlog files(for backup/restore tools)
	// the backup/restore tool provide two paths for a partition, the first path is binlog path, the second is deltalog path
242
	if options.IsBackup && p.isBinlogImport(filePaths) {
G
groot 已提交
243
		return p.doBinlogImport(filePaths, options.TsStartPoint, options.TsEndPoint)
G
groot 已提交
244 245 246
	}

	// normal logic for import general data files
G
groot 已提交
247
	rowBased, err := p.fileValidation(filePaths)
G
groot 已提交
248 249 250 251
	if err != nil {
		return err
	}

G
groot 已提交
252
	tr := timerecord.NewTimeRecorder("Import task")
G
groot 已提交
253 254 255
	if rowBased {
		// parse and consume row-based files
		// for row-based files, the JSONRowConsumer will generate autoid for primary key, and split rows into segments
G
groot 已提交
256
		// according to shard number, so the flushFunc will be called in the JSONRowConsumer
G
groot 已提交
257 258
		for i := 0; i < len(filePaths); i++ {
			filePath := filePaths[i]
G
groot 已提交
259
			_, fileType := GetFileNameAndExt(filePath)
G
groot 已提交
260
			log.Info("import wrapper:  row-based file ", zap.Any("filePath", filePath), zap.Any("fileType", fileType))
G
groot 已提交
261 262

			if fileType == JSONFileExt {
G
groot 已提交
263
				err = p.parseRowBasedJSON(filePath, options.OnlyValidate)
G
groot 已提交
264
				if err != nil {
265
					log.Warn("import wrapper: failed to parse row-based json file", zap.Error(err), zap.String("filePath", filePath))
G
groot 已提交
266 267
					return err
				}
G
groot 已提交
268 269 270 271
			} // no need to check else, since the fileValidation() already do this

			// trigger gc after each file finished
			triggerGC()
G
groot 已提交
272 273
		}
	} else {
G
groot 已提交
274 275 276
		// parse and consume column-based files(currently support numpy)
		// for column-based files, the NumpyParser will generate autoid for primary key, and split rows into segments
		// according to shard number, so the flushFunc will be called in the NumpyParser
277
		flushFunc := func(fields BlockData, shardID int, partitionID int64) error {
G
groot 已提交
278
			printFieldsDataInfo(fields, "import wrapper: prepare to flush binlog data", filePaths)
279
			return p.flushFunc(fields, shardID, partitionID)
G
groot 已提交
280
		}
281
		parser, err := NewNumpyParser(p.ctx, p.collectionInfo, p.rowIDAllocator, SingleBlockSize,
G
groot 已提交
282
			p.chunkManager, flushFunc, p.updateProgressPercent)
G
groot 已提交
283 284
		if err != nil {
			return err
G
groot 已提交
285 286
		}

G
groot 已提交
287
		err = parser.Parse(filePaths)
G
groot 已提交
288 289 290
		if err != nil {
			return err
		}
G
groot 已提交
291

G
groot 已提交
292 293 294
		p.importResult.AutoIds = append(p.importResult.AutoIds, parser.IDRange()...)

		// trigger after parse finished
G
groot 已提交
295
		triggerGC()
G
groot 已提交
296 297
	}

G
groot 已提交
298
	return p.reportPersisted(p.reportImportAttempts, tr)
G
groot 已提交
299 300
}

G
groot 已提交
301
// reportPersisted notify the rootcoord to mark the task state to be ImportPersisted
G
groot 已提交
302
func (p *ImportWrapper) reportPersisted(reportAttempts uint, tr *timerecord.TimeRecorder) error {
G
groot 已提交
303 304 305 306 307 308
	// force close all segments
	err := p.closeAllWorkingSegments()
	if err != nil {
		return err
	}

G
groot 已提交
309 310 311 312 313 314
	if tr != nil {
		ts := tr.Elapse("persist finished").Seconds()
		p.importResult.Infos = append(p.importResult.Infos,
			&commonpb.KeyValuePair{Key: PersistTimeCost, Value: strconv.FormatFloat(ts, 'f', 2, 64)})
	}

G
groot 已提交
315 316
	// report file process state
	p.importResult.State = commonpb.ImportState_ImportPersisted
G
groot 已提交
317 318 319
	progressValue := strconv.Itoa(ProgressValueForPersist)
	UpdateKVInfo(&p.importResult.Infos, ProgressPercent, progressValue)

G
groot 已提交
320
	log.Info("import wrapper: report import result", zap.Any("importResult", p.importResult))
321 322 323
	// persist state task is valuable, retry more times in case fail this task only because of network error
	reportErr := retry.Do(p.ctx, func() error {
		return p.reportFunc(p.importResult)
G
groot 已提交
324
	}, retry.Attempts(reportAttempts))
325
	if reportErr != nil {
326
		log.Warn("import wrapper: fail to report import state to RootCoord", zap.Error(reportErr))
327 328 329 330 331
		return reportErr
	}
	return nil
}

G
groot 已提交
332
// isBinlogImport is to judge whether it is binlog import operation
G
groot 已提交
333 334
// For internal usage by the restore tool: https://github.com/zilliztech/milvus-backup
// This tool exports data from a milvus service, and call bulkload interface to import native data into another milvus service.
335
// This tool provides two paths: one is insert log path of a partition,the other is delta log path of this partition.
G
groot 已提交
336 337
// This method checks the filePaths, if the file paths is exist and not a file, we say it is native import.
func (p *ImportWrapper) isBinlogImport(filePaths []string) bool {
338 339 340
	// must contains the insert log path, and the delta log path is optional to be empty string
	if len(filePaths) != 2 {
		log.Info("import wrapper: paths count is not 2, not binlog import", zap.Int("len", len(filePaths)))
G
groot 已提交
341 342 343
		return false
	}

344
	checkFunc := func(filePath string) bool {
G
groot 已提交
345
		// contains file extension, is not a path
346
		_, fileType := GetFileNameAndExt(filePath)
G
groot 已提交
347
		if len(fileType) != 0 {
G
groot 已提交
348
			log.Info("import wrapper: not a path, not binlog import", zap.String("filePath", filePath), zap.String("fileType", fileType))
G
groot 已提交
349 350
			return false
		}
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
		return true
	}

	// the first path is insert log path
	filePath := filePaths[0]
	if len(filePath) == 0 {
		log.Info("import wrapper: the first path is empty string, not binlog import")
		return false
	}

	if !checkFunc(filePath) {
		return false
	}

	// the second path is delta log path
	filePath = filePaths[1]
	if len(filePath) > 0 && !checkFunc(filePath) {
		return false
G
groot 已提交
369 370 371 372 373 374
	}

	log.Info("import wrapper: do binlog import")
	return true
}

G
groot 已提交
375 376
// doBinlogImport is the entry of binlog import operation
func (p *ImportWrapper) doBinlogImport(filePaths []string, tsStartPoint uint64, tsEndPoint uint64) error {
G
groot 已提交
377 378
	tr := timerecord.NewTimeRecorder("Import task")

379
	flushFunc := func(fields BlockData, shardID int, partitionID int64) error {
G
groot 已提交
380
		printFieldsDataInfo(fields, "import wrapper: prepare to flush binlog data", filePaths)
381
		return p.flushFunc(fields, shardID, partitionID)
G
groot 已提交
382
	}
383 384
	parser, err := NewBinlogParser(p.ctx, p.collectionInfo, SingleBlockSize,
		p.chunkManager, flushFunc, p.updateProgressPercent, tsStartPoint, tsEndPoint)
G
groot 已提交
385 386 387 388 389 390 391 392 393
	if err != nil {
		return err
	}

	err = parser.Parse(filePaths)
	if err != nil {
		return err
	}

G
groot 已提交
394
	return p.reportPersisted(p.reportImportAttempts, tr)
G
groot 已提交
395 396
}

G
groot 已提交
397
// parseRowBasedJSON is the entry of row-based json import operation
398 399 400 401 402
func (p *ImportWrapper) parseRowBasedJSON(filePath string, onlyValidate bool) error {
	tr := timerecord.NewTimeRecorder("json row-based parser: " + filePath)

	// for minio storage, chunkManager will download file into local memory
	// for local storage, chunkManager open the file directly
403
	file, err := p.chunkManager.Reader(p.ctx, filePath)
404 405 406 407 408
	if err != nil {
		return err
	}
	defer file.Close()

G
groot 已提交
409 410 411 412 413
	size, err := p.chunkManager.Size(p.ctx, filePath)
	if err != nil {
		return err
	}

414 415
	// parse file
	reader := bufio.NewReader(file)
416
	parser := NewJSONParser(p.ctx, p.collectionInfo, p.updateProgressPercent)
417 418 419 420

	// if only validate, we input a empty flushFunc so that the consumer do nothing but only validation.
	var flushFunc ImportFlushFunc
	if onlyValidate {
421
		flushFunc = func(fields BlockData, shardID int, partitionID int64) error {
422 423 424
			return nil
		}
	} else {
425
		flushFunc = func(fields BlockData, shardID int, partitionID int64) error {
426
			var filePaths = []string{filePath}
G
groot 已提交
427
			printFieldsDataInfo(fields, "import wrapper: prepare to flush binlogs", filePaths)
428
			return p.flushFunc(fields, shardID, partitionID)
429 430
		}
	}
G
groot 已提交
431

432
	consumer, err := NewJSONRowConsumer(p.ctx, p.collectionInfo, p.rowIDAllocator, SingleBlockSize, flushFunc)
433 434 435 436
	if err != nil {
		return err
	}

G
groot 已提交
437
	err = parser.ParseRows(&IOReader{r: reader, fileSize: size}, consumer)
438 439 440 441 442
	if err != nil {
		return err
	}

	// for row-based files, auto-id is generated within JSONRowConsumer
G
groot 已提交
443
	p.importResult.AutoIds = append(p.importResult.AutoIds, consumer.IDRange()...)
444 445 446 447 448

	tr.Elapse("parsed")
	return nil
}

G
groot 已提交
449
// flushFunc is the callback function for parsers generate segment and save binlog files
450 451 452 453 454 455
func (p *ImportWrapper) flushFunc(fields BlockData, shardID int, partitionID int64) error {
	logFields := []zap.Field{
		zap.Int("shardID", shardID),
		zap.Int64("partitionID", partitionID),
	}

G
groot 已提交
456
	// if fields data is empty, do nothing
G
groot 已提交
457
	rowNum := 0
G
groot 已提交
458 459 460 461 462 463
	memSize := 0
	for _, field := range fields {
		rowNum = field.RowNum()
		memSize += field.GetMemorySize()
	}
	if rowNum <= 0 {
464
		log.Warn("import wrapper: fields data is empty", logFields...)
G
groot 已提交
465 466 467
		return nil
	}

G
groot 已提交
468 469 470
	logFields = append(logFields, zap.Int("rowNum", rowNum), zap.Int("memSize", memSize))
	log.Info("import wrapper: flush block data to binlog", logFields...)

G
groot 已提交
471 472 473
	// if there is no segment for this shard, create a new one
	// if the segment exists and its size almost exceed segmentSize, close it and create a new one
	var segment *WorkingSegment
474
	if shard, ok := p.workingSegments[shardID]; ok {
G
groot 已提交
475 476 477 478 479 480
		if segmentTemp, exists := shard[partitionID]; exists {
			log.Info("import wrapper: compare working segment memSize with segmentSize",
				zap.Int("memSize", segmentTemp.memSize), zap.Int64("segmentSize", p.segmentSize))
			if int64(segmentTemp.memSize)+int64(memSize) >= p.segmentSize {
				// the segment already exists, check its size, if the size exceeds(or almost) segmentSize, close the segment
				err := p.closeWorkingSegment(segmentTemp)
481 482 483 484 485 486
				if err != nil {
					logFields = append(logFields, zap.Error(err))
					log.Warn("import wrapper: failed to close working segment", logFields...)
					return err
				}
				p.workingSegments[shardID][partitionID] = nil
G
groot 已提交
487 488 489
			} else {
				// the exist segment size is small, no need to close
				segment = segmentTemp
G
groot 已提交
490 491
			}
		}
492 493
	} else {
		p.workingSegments[shardID] = make(map[int64]*WorkingSegment)
G
groot 已提交
494 495 496 497
	}

	if segment == nil {
		// create a new segment
498
		segID, channelName, err := p.assignSegmentFunc(shardID, partitionID)
G
groot 已提交
499
		if err != nil {
500 501
			logFields = append(logFields, zap.Error(err))
			log.Warn("import wrapper: failed to assign a new segment", logFields...)
G
groot 已提交
502
			return fmt.Errorf("failed to assign a new segment for shard id %d, error: %w", shardID, err)
G
groot 已提交
503 504 505 506 507
		}

		segment = &WorkingSegment{
			segmentID:    segID,
			shardID:      shardID,
508
			partitionID:  partitionID,
G
groot 已提交
509 510 511 512 513 514
			targetChName: channelName,
			rowCount:     int64(0),
			memSize:      0,
			fieldsInsert: make([]*datapb.FieldBinlog, 0),
			fieldsStats:  make([]*datapb.FieldBinlog, 0),
		}
515
		p.workingSegments[shardID][partitionID] = segment
G
groot 已提交
516 517 518
	}

	// save binlogs
519
	fieldsInsert, fieldsStats, err := p.createBinlogsFunc(fields, segment.segmentID, partitionID)
G
groot 已提交
520
	if err != nil {
521 522 523
		logFields = append(logFields, zap.Error(err), zap.Int64("segmentID", segment.segmentID),
			zap.String("targetChannel", segment.targetChName))
		log.Warn("import wrapper: failed to save binlogs", logFields...)
G
groot 已提交
524 525
		return fmt.Errorf("failed to save binlogs, shard id %d, segment id %d, channel '%s', error: %w",
			shardID, segment.segmentID, segment.targetChName, err)
G
groot 已提交
526 527 528 529 530 531 532
	}

	segment.fieldsInsert = append(segment.fieldsInsert, fieldsInsert...)
	segment.fieldsStats = append(segment.fieldsStats, fieldsStats...)
	segment.rowCount += int64(rowNum)
	segment.memSize += memSize

G
groot 已提交
533 534 535 536 537 538 539 540
	// report working progress percent value to rootcoord
	// if failed to report, ignore the error, the percent value might be improper but the task can be succeed
	progressValue := strconv.Itoa(int(p.progressPercent))
	UpdateKVInfo(&p.importResult.Infos, ProgressPercent, progressValue)
	reportErr := retry.Do(p.ctx, func() error {
		return p.reportFunc(p.importResult)
	}, retry.Attempts(p.reportImportAttempts))
	if reportErr != nil {
541 542
		logFields = append(logFields, zap.Error(err))
		log.Warn("import wrapper: fail to report working progress percent value to RootCoord", logFields...)
G
groot 已提交
543 544
	}

G
groot 已提交
545 546 547 548 549
	return nil
}

// closeWorkingSegment marks a segment to be sealed
func (p *ImportWrapper) closeWorkingSegment(segment *WorkingSegment) error {
550
	logFields := []zap.Field{
G
groot 已提交
551 552 553 554 555
		zap.Int("shardID", segment.shardID),
		zap.Int64("segmentID", segment.segmentID),
		zap.String("targetChannel", segment.targetChName),
		zap.Int64("rowCount", segment.rowCount),
		zap.Int("insertLogCount", len(segment.fieldsInsert)),
556 557 558
		zap.Int("statsLogCount", len(segment.fieldsStats)),
	}
	log.Info("import wrapper: adding segment to the correct DataNode flow graph and saving binlog paths", logFields...)
G
groot 已提交
559

560 561
	err := p.saveSegmentFunc(segment.fieldsInsert, segment.fieldsStats, segment.segmentID, segment.targetChName,
		segment.rowCount, segment.partitionID)
G
groot 已提交
562
	if err != nil {
563 564
		logFields = append(logFields, zap.Error(err))
		log.Warn("import wrapper: failed to seal segment", logFields...)
G
groot 已提交
565 566
		return fmt.Errorf("failed to seal segment, shard id %d, segment id %d, channel '%s', error: %w",
			segment.shardID, segment.segmentID, segment.targetChName, err)
G
groot 已提交
567 568 569 570 571 572 573
	}

	return nil
}

// closeAllWorkingSegments mark all segments to be sealed at the end of import operation
func (p *ImportWrapper) closeAllWorkingSegments() error {
574 575 576 577 578 579
	for _, shard := range p.workingSegments {
		for _, segment := range shard {
			err := p.closeWorkingSegment(segment)
			if err != nil {
				return err
			}
G
groot 已提交
580 581
		}
	}
582
	p.workingSegments = make(map[int]map[int64]*WorkingSegment)
G
groot 已提交
583 584 585

	return nil
}
G
groot 已提交
586 587 588 589 590 591 592 593

func (p *ImportWrapper) updateProgressPercent(percent int64) {
	// ignore illegal percent value
	if percent < 0 || percent > 100 {
		return
	}
	p.progressPercent = percent
}