import_wrapper.go 33.1 KB
Newer Older
G
groot 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

G
groot 已提交
17 18 19 20 21
package importutil

import (
	"bufio"
	"context"
G
groot 已提交
22
	"fmt"
G
groot 已提交
23
	"math"
G
groot 已提交
24

G
godchen 已提交
25 26
	"go.uber.org/zap"

S
SimFG 已提交
27 28
	"github.com/milvus-io/milvus-proto/go-api/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/schemapb"
G
groot 已提交
29 30 31
	"github.com/milvus-io/milvus/internal/allocator"
	"github.com/milvus-io/milvus/internal/common"
	"github.com/milvus-io/milvus/internal/log"
G
groot 已提交
32
	"github.com/milvus-io/milvus/internal/proto/datapb"
G
groot 已提交
33
	"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
G
groot 已提交
34
	"github.com/milvus-io/milvus/internal/storage"
35
	"github.com/milvus-io/milvus/internal/util/retry"
G
groot 已提交
36
	"github.com/milvus-io/milvus/internal/util/timerecord"
G
groot 已提交
37 38 39 40 41 42
	"github.com/milvus-io/milvus/internal/util/typeutil"
)

const (
	JSONFileExt  = ".json"
	NumpyFileExt = ".npy"
G
groot 已提交
43

G
groot 已提交
44 45 46
	// supposed size of a single block, to control a binlog file size, the max biglog file size is no more than 2*SingleBlockSize
	SingleBlockSize = 16 * 1024 * 1024 // 16MB

G
groot 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60
	// this limitation is to avoid this OOM risk:
	// for column-based file, we read all its data into memory, if user input a large file, the read() method may
	// cost extra memory and lear to OOM.
	MaxFileSize = 1 * 1024 * 1024 * 1024 // 1GB

	// this limitation is to avoid this OOM risk:
	// simetimes system segment max size is a large number, a single segment fields data might cause OOM.
	// flush the segment when its data reach this limitation, let the compaction to compact it later.
	MaxSegmentSizeInMemory = 512 * 1024 * 1024 // 512MB

	// this limitation is to avoid this OOM risk:
	// if the shard number is a large number, although single segment size is small, but there are lot of in-memory segments,
	// the total memory size might cause OOM.
	MaxTotalSizeInMemory = 2 * 1024 * 1024 * 1024 // 2GB
G
groot 已提交
61 62
)

63 64 65
// ReportImportAttempts is the maximum # of attempts to retry when import fails.
var ReportImportAttempts uint = 10

G
groot 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
type ImportFlushFunc func(fields map[storage.FieldID]storage.FieldData, shardID int) error
type AssignSegmentFunc func(shardID int) (int64, string, error)
type CreateBinlogsFunc func(fields map[storage.FieldID]storage.FieldData, segmentID int64) ([]*datapb.FieldBinlog, []*datapb.FieldBinlog, error)
type SaveSegmentFunc func(fieldsInsert []*datapb.FieldBinlog, fieldsStats []*datapb.FieldBinlog, segmentID int64, targetChName string, rowCount int64) error

type WorkingSegment struct {
	segmentID    int64                 // segment ID
	shardID      int                   // shard id
	targetChName string                // target dml channel
	rowCount     int64                 // accumulate row count
	memSize      int                   // total memory size of all binlogs
	fieldsInsert []*datapb.FieldBinlog // persisted binlogs
	fieldsStats  []*datapb.FieldBinlog // stats of persisted binlogs
}

type ImportOptions struct {
	OnlyValidate bool
	TsStartPoint uint64
	TsEndPoint   uint64
}

func DefaultImportOptions() ImportOptions {
	options := ImportOptions{
		OnlyValidate: false,
		TsStartPoint: 0,
		TsEndPoint:   math.MaxUint64,
	}
	return options
}

G
groot 已提交
96 97 98 99 100
type ImportWrapper struct {
	ctx              context.Context            // for canceling parse process
	cancel           context.CancelFunc         // for canceling parse process
	collectionSchema *schemapb.CollectionSchema // collection schema
	shardNum         int32                      // sharding number of the collection
G
groot 已提交
101
	segmentSize      int64                      // maximum size of a segment(unit:byte) defined by dataCoord.segment.maxSize (milvus.yml)
G
groot 已提交
102
	rowIDAllocator   *allocator.IDAllocator     // autoid allocator
G
godchen 已提交
103
	chunkManager     storage.ChunkManager
G
groot 已提交
104

G
groot 已提交
105 106 107
	assignSegmentFunc AssignSegmentFunc // function to prepare a new segment
	createBinlogsFunc CreateBinlogsFunc // function to create binlog for a segment
	saveSegmentFunc   SaveSegmentFunc   // function to persist a segment
G
groot 已提交
108 109 110

	importResult *rootcoordpb.ImportResult                 // import result
	reportFunc   func(res *rootcoordpb.ImportResult) error // report import state to rootcoord
G
groot 已提交
111 112

	workingSegments map[int]*WorkingSegment // a map shard id to working segments
G
groot 已提交
113 114
}

G
godchen 已提交
115
func NewImportWrapper(ctx context.Context, collectionSchema *schemapb.CollectionSchema, shardNum int32, segmentSize int64,
G
groot 已提交
116 117
	idAlloc *allocator.IDAllocator, cm storage.ChunkManager, importResult *rootcoordpb.ImportResult,
	reportFunc func(res *rootcoordpb.ImportResult) error) *ImportWrapper {
G
groot 已提交
118
	if collectionSchema == nil {
G
groot 已提交
119
		log.Error("import wrapper: collection schema is nil")
G
groot 已提交
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
		return nil
	}

	// ignore the RowID field and Timestamp field
	realSchema := &schemapb.CollectionSchema{
		Name:        collectionSchema.GetName(),
		Description: collectionSchema.GetDescription(),
		AutoID:      collectionSchema.GetAutoID(),
		Fields:      make([]*schemapb.FieldSchema, 0),
	}
	for i := 0; i < len(collectionSchema.Fields); i++ {
		schema := collectionSchema.Fields[i]
		if schema.GetName() == common.RowIDFieldName || schema.GetName() == common.TimeStampFieldName {
			continue
		}
		realSchema.Fields = append(realSchema.Fields, schema)
	}

	ctx, cancel := context.WithCancel(ctx)

	wrapper := &ImportWrapper{
		ctx:              ctx,
		cancel:           cancel,
		collectionSchema: realSchema,
		shardNum:         shardNum,
		segmentSize:      segmentSize,
		rowIDAllocator:   idAlloc,
G
godchen 已提交
147
		chunkManager:     cm,
G
groot 已提交
148 149
		importResult:     importResult,
		reportFunc:       reportFunc,
G
groot 已提交
150
		workingSegments:  make(map[int]*WorkingSegment),
G
groot 已提交
151 152 153 154 155
	}

	return wrapper
}

G
groot 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
func (p *ImportWrapper) SetCallbackFunctions(assignSegmentFunc AssignSegmentFunc, createBinlogsFunc CreateBinlogsFunc, saveSegmentFunc SaveSegmentFunc) error {
	if assignSegmentFunc == nil {
		log.Error("import wrapper: callback function AssignSegmentFunc is nil")
		return fmt.Errorf("import wrapper: callback function AssignSegmentFunc is nil")
	}

	if createBinlogsFunc == nil {
		log.Error("import wrapper: callback function CreateBinlogsFunc is nil")
		return fmt.Errorf("import wrapper: callback function CreateBinlogsFunc is nil")
	}

	if saveSegmentFunc == nil {
		log.Error("import wrapper: callback function SaveSegmentFunc is nil")
		return fmt.Errorf("import wrapper: callback function SaveSegmentFunc is nil")
	}

	p.assignSegmentFunc = assignSegmentFunc
	p.createBinlogsFunc = createBinlogsFunc
	p.saveSegmentFunc = saveSegmentFunc
	return nil
}

// Cancel method can be used to cancel parse process
G
groot 已提交
179 180 181 182 183
func (p *ImportWrapper) Cancel() error {
	p.cancel()
	return nil
}

G
groot 已提交
184 185 186 187 188 189 190 191 192 193
func (p *ImportWrapper) validateColumnBasedFiles(filePaths []string, collectionSchema *schemapb.CollectionSchema) error {
	requiredFieldNames := make(map[string]interface{})
	for _, schema := range p.collectionSchema.Fields {
		if schema.GetIsPrimaryKey() {
			if !schema.GetAutoID() {
				requiredFieldNames[schema.GetName()] = nil
			}
		} else {
			requiredFieldNames[schema.GetName()] = nil
		}
G
groot 已提交
194
	}
195

G
groot 已提交
196 197 198 199 200 201 202 203 204 205
	// check redundant file
	fileNames := make(map[string]interface{})
	for _, filePath := range filePaths {
		name, _ := GetFileNameAndExt(filePath)
		fileNames[name] = nil
		_, ok := requiredFieldNames[name]
		if !ok {
			log.Error("import wrapper: the file has no corresponding field in collection", zap.String("fieldName", name))
			return fmt.Errorf("import wrapper: the file '%s' has no corresponding field in collection", filePath)
		}
G
groot 已提交
206 207
	}

G
groot 已提交
208 209 210 211 212 213 214 215
	// check missed file
	for name := range requiredFieldNames {
		_, ok := fileNames[name]
		if !ok {
			log.Error("import wrapper: there is no file corresponding to field", zap.String("fieldName", name))
			return fmt.Errorf("import wrapper: there is no file corresponding to field '%s'", name)
		}
	}
G
groot 已提交
216

G
groot 已提交
217
	return nil
G
groot 已提交
218 219
}

G
groot 已提交
220 221 222 223
// fileValidation verify the input paths
// if all the files are json type, return true
// if all the files are numpy type, return false, and not allow duplicate file name
func (p *ImportWrapper) fileValidation(filePaths []string) (bool, error) {
224 225
	// use this map to check duplicate file name(only for numpy file)
	fileNames := make(map[string]struct{})
G
groot 已提交
226

G
groot 已提交
227
	totalSize := int64(0)
G
groot 已提交
228
	rowBased := false
G
groot 已提交
229 230
	for i := 0; i < len(filePaths); i++ {
		filePath := filePaths[i]
G
groot 已提交
231 232 233 234 235 236 237 238 239 240 241
		name, fileType := GetFileNameAndExt(filePath)

		// only allow json file or numpy file
		if fileType != JSONFileExt && fileType != NumpyFileExt {
			log.Error("import wrapper: unsupportted file type", zap.String("filePath", filePath))
			return false, fmt.Errorf("import wrapper: unsupportted file type: '%s'", filePath)
		}

		// we use the first file to determine row-based or column-based
		if i == 0 && fileType == JSONFileExt {
			rowBased = true
G
groot 已提交
242
		}
G
groot 已提交
243 244

		// check file type
G
groot 已提交
245
		// row-based only support json type, column-based only support numpy type
G
groot 已提交
246 247
		if rowBased {
			if fileType != JSONFileExt {
G
groot 已提交
248
				log.Error("import wrapper: unsupported file type for row-based mode", zap.String("filePath", filePath))
G
groot 已提交
249
				return rowBased, fmt.Errorf("import wrapper: unsupported file type for row-based mode: '%s'", filePath)
G
groot 已提交
250 251
			}
		} else {
G
groot 已提交
252
			if fileType != NumpyFileExt {
G
groot 已提交
253
				log.Error("import wrapper: unsupported file type for column-based mode", zap.String("filePath", filePath))
G
groot 已提交
254
				return rowBased, fmt.Errorf("import wrapper: unsupported file type for column-based mode: '%s'", filePath)
G
groot 已提交
255 256 257
			}
		}

G
groot 已提交
258 259 260 261 262 263 264 265
		// check dupliate file
		_, ok := fileNames[name]
		if ok {
			log.Error("import wrapper: duplicate file name", zap.String("filePath", filePath))
			return rowBased, fmt.Errorf("import wrapper: duplicate file: '%s'", filePath)
		}
		fileNames[name] = struct{}{}

G
groot 已提交
266
		// check file size, single file size cannot exceed MaxFileSize
267
		// TODO add context
G
groot 已提交
268 269
		size, err := p.chunkManager.Size(context.TODO(), filePath)
		if err != nil {
G
groot 已提交
270
			log.Error("import wrapper: failed to get file size", zap.String("filePath", filePath), zap.Error(err))
G
groot 已提交
271
			return rowBased, fmt.Errorf("import wrapper: failed to get file size of '%s'", filePath)
G
groot 已提交
272 273
		}

G
groot 已提交
274
		// empty file
G
groot 已提交
275
		if size == 0 {
G
groot 已提交
276 277
			log.Error("import wrapper: file size is zero", zap.String("filePath", filePath))
			return rowBased, fmt.Errorf("import wrapper: the file '%s' size is zero", filePath)
G
groot 已提交
278
		}
G
groot 已提交
279

G
groot 已提交
280
		if size > MaxFileSize {
G
groot 已提交
281 282
			log.Error("import wrapper: file size exceeds the maximum size", zap.String("filePath", filePath),
				zap.Int64("fileSize", size), zap.Int64("MaxFileSize", MaxFileSize))
G
groot 已提交
283
			return rowBased, fmt.Errorf("import wrapper: the file '%s' size exceeds the maximum size: %d bytes", filePath, MaxFileSize)
G
groot 已提交
284
		}
G
groot 已提交
285 286 287 288 289 290
		totalSize += size
	}

	// especially for column-base, total size of files cannot exceed MaxTotalSizeInMemory
	if totalSize > MaxTotalSizeInMemory {
		log.Error("import wrapper: total size of files exceeds the maximum size", zap.Int64("totalSize", totalSize), zap.Int64("MaxTotalSize", MaxTotalSizeInMemory))
G
groot 已提交
291
		return rowBased, fmt.Errorf("import wrapper: total size(%d bytes) of all files exceeds the maximum size: %d bytes", totalSize, MaxTotalSizeInMemory)
G
groot 已提交
292 293
	}

G
groot 已提交
294 295 296 297 298 299 300 301 302 303 304
	// check redundant files for column-based import
	// if the field is primary key and autoid is false, the file is required
	// any redundant file is not allowed
	if !rowBased {
		err := p.validateColumnBasedFiles(filePaths, p.collectionSchema)
		if err != nil {
			return rowBased, err
		}
	}

	return rowBased, nil
G
groot 已提交
305 306
}

G
groot 已提交
307
// Import is the entry of import operation
G
groot 已提交
308
// filePath and rowBased are from ImportTask
G
groot 已提交
309 310 311
// if onlyValidate is true, this process only do validation, no data generated, flushFunc will not be called
func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error {
	log.Info("import wrapper: begin import", zap.Any("filePaths", filePaths), zap.Any("options", options))
G
groot 已提交
312 313 314 315
	// data restore function to import milvus native binlog files(for backup/restore tools)
	// the backup/restore tool provide two paths for a partition, the first path is binlog path, the second is deltalog path
	if p.isBinlogImport(filePaths) {
		// TODO: handle the timestamp end point passed from client side, currently use math.MaxUint64
G
groot 已提交
316
		return p.doBinlogImport(filePaths, options.TsStartPoint, options.TsEndPoint)
G
groot 已提交
317 318 319
	}

	// normal logic for import general data files
G
groot 已提交
320
	rowBased, err := p.fileValidation(filePaths)
G
groot 已提交
321 322 323 324
	if err != nil {
		return err
	}

G
groot 已提交
325 326 327
	if rowBased {
		// parse and consume row-based files
		// for row-based files, the JSONRowConsumer will generate autoid for primary key, and split rows into segments
G
groot 已提交
328
		// according to shard number, so the flushFunc will be called in the JSONRowConsumer
G
groot 已提交
329 330
		for i := 0; i < len(filePaths); i++ {
			filePath := filePaths[i]
G
groot 已提交
331
			_, fileType := GetFileNameAndExt(filePath)
G
groot 已提交
332
			log.Info("import wrapper:  row-based file ", zap.Any("filePath", filePath), zap.Any("fileType", fileType))
G
groot 已提交
333 334

			if fileType == JSONFileExt {
G
groot 已提交
335
				err = p.parseRowBasedJSON(filePath, options.OnlyValidate)
G
groot 已提交
336
				if err != nil {
G
groot 已提交
337
					log.Error("import wrapper: failed to parse row-based json file", zap.Error(err), zap.String("filePath", filePath))
G
groot 已提交
338 339
					return err
				}
G
groot 已提交
340 341 342 343
			} // no need to check else, since the fileValidation() already do this

			// trigger gc after each file finished
			triggerGC()
G
groot 已提交
344 345
		}
	} else {
346
		// parse and consume column-based files
G
groot 已提交
347 348 349 350
		// for column-based files, the XXXColumnConsumer only output map[string]storage.FieldData
		// after all columns are parsed/consumed, we need to combine map[string]storage.FieldData into one
		// and use splitFieldsData() to split fields data into segments according to shard number
		fieldsData := initSegmentData(p.collectionSchema)
G
groot 已提交
351 352
		if fieldsData == nil {
			log.Error("import wrapper: failed to initialize FieldData list")
G
groot 已提交
353
			return fmt.Errorf("import wrapper: failed to initialize FieldData list")
G
groot 已提交
354 355
		}

G
groot 已提交
356 357 358
		rowCount := 0

		// function to combine column data into fieldsData
G
godchen 已提交
359
		combineFunc := func(fields map[storage.FieldID]storage.FieldData) error {
G
groot 已提交
360 361 362 363
			if len(fields) == 0 {
				return nil
			}

G
groot 已提交
364
			printFieldsDataInfo(fields, "import wrapper: combine field data", nil)
G
groot 已提交
365 366
			tr := timerecord.NewTimeRecorder("combine field data")
			defer tr.Elapse("finished")
367

G
groot 已提交
368
			for k, v := range fields {
369 370
				// ignore 0 row field
				if v.RowNum() == 0 {
G
groot 已提交
371 372 373 374 375 376 377
					log.Warn("import wrapper: empty FieldData ignored", zap.Int64("fieldID", k))
					continue
				}

				// ignore internal fields: RowIDField and TimeStampField
				if k == common.RowIDField || k == common.TimeStampField {
					log.Warn("import wrapper: internal fields should not be provided", zap.Int64("fieldID", k))
378 379 380 381
					continue
				}

				// each column should be only combined once
G
groot 已提交
382 383
				data, ok := fieldsData[k]
				if ok && data.RowNum() > 0 {
G
groot 已提交
384
					return fmt.Errorf("the field %d is duplicated", k)
G
groot 已提交
385 386
				}

387 388
				// check the row count. only count non-zero row fields
				if rowCount > 0 && rowCount != v.RowNum() {
G
groot 已提交
389
					return fmt.Errorf("the field %d row count %d doesn't equal others row count: %d", k, v.RowNum(), rowCount)
390 391 392 393
				}
				rowCount = v.RowNum()

				// assign column data to fieldsData
G
groot 已提交
394 395 396 397 398 399 400 401 402
				fieldsData[k] = v
			}

			return nil
		}

		// parse/validate/consume data
		for i := 0; i < len(filePaths); i++ {
			filePath := filePaths[i]
G
groot 已提交
403
			_, fileType := GetFileNameAndExt(filePath)
G
groot 已提交
404
			log.Info("import wrapper:  column-based file ", zap.Any("filePath", filePath), zap.Any("fileType", fileType))
G
groot 已提交
405

G
groot 已提交
406 407
			if fileType == NumpyFileExt {
				err = p.parseColumnBasedNumpy(filePath, options.OnlyValidate, combineFunc)
G
groot 已提交
408 409

				if err != nil {
G
groot 已提交
410
					log.Error("import wrapper: failed to parse column-based numpy file", zap.Error(err), zap.String("filePath", filePath))
G
groot 已提交
411 412
					return err
				}
G
groot 已提交
413
			}
414
			// no need to check else, since the fileValidation() already do this
G
groot 已提交
415 416
		}

G
groot 已提交
417 418 419
		// trigger after read finished
		triggerGC()

G
groot 已提交
420
		// split fields data into segments
G
groot 已提交
421
		err := p.splitFieldsData(fieldsData, SingleBlockSize)
G
groot 已提交
422 423 424
		if err != nil {
			return err
		}
G
groot 已提交
425 426 427

		// trigger after write finished
		triggerGC()
G
groot 已提交
428 429
	}

G
groot 已提交
430 431 432
	return p.reportPersisted()
}

G
groot 已提交
433
// reportPersisted notify the rootcoord to mark the task state to be ImportPersisted
G
groot 已提交
434
func (p *ImportWrapper) reportPersisted() error {
G
groot 已提交
435 436 437 438 439 440
	// force close all segments
	err := p.closeAllWorkingSegments()
	if err != nil {
		return err
	}

G
groot 已提交
441 442
	// report file process state
	p.importResult.State = commonpb.ImportState_ImportPersisted
443 444 445
	// persist state task is valuable, retry more times in case fail this task only because of network error
	reportErr := retry.Do(p.ctx, func() error {
		return p.reportFunc(p.importResult)
446
	}, retry.Attempts(ReportImportAttempts))
447
	if reportErr != nil {
448
		log.Warn("import wrapper: fail to report import state to RootCoord", zap.Error(reportErr))
449 450 451 452 453
		return reportErr
	}
	return nil
}

G
groot 已提交
454
// isBinlogImport is to judge whether it is binlog import operation
G
groot 已提交
455 456 457 458 459 460 461
// For internal usage by the restore tool: https://github.com/zilliztech/milvus-backup
// This tool exports data from a milvus service, and call bulkload interface to import native data into another milvus service.
// This tool provides two paths: one is data log path of a partition,the other is delta log path of this partition.
// This method checks the filePaths, if the file paths is exist and not a file, we say it is native import.
func (p *ImportWrapper) isBinlogImport(filePaths []string) bool {
	// must contains the insert log path, and the delta log path is optional
	if len(filePaths) != 1 && len(filePaths) != 2 {
G
groot 已提交
462
		log.Info("import wrapper: paths count is not 1 or 2, not binlog import", zap.Int("len", len(filePaths)))
G
groot 已提交
463 464 465 466 467
		return false
	}

	for i := 0; i < len(filePaths); i++ {
		filePath := filePaths[i]
G
groot 已提交
468
		_, fileType := GetFileNameAndExt(filePath)
G
groot 已提交
469 470
		// contains file extension, is not a path
		if len(fileType) != 0 {
G
groot 已提交
471
			log.Info("import wrapper: not a path, not binlog import", zap.String("filePath", filePath), zap.String("fileType", fileType))
G
groot 已提交
472 473 474 475 476 477 478 479
			return false
		}
	}

	log.Info("import wrapper: do binlog import")
	return true
}

G
groot 已提交
480 481
// doBinlogImport is the entry of binlog import operation
func (p *ImportWrapper) doBinlogImport(filePaths []string, tsStartPoint uint64, tsEndPoint uint64) error {
G
groot 已提交
482
	flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
G
groot 已提交
483 484
		printFieldsDataInfo(fields, "import wrapper: prepare to flush binlog data", filePaths)
		return p.flushFunc(fields, shardID)
G
groot 已提交
485
	}
G
groot 已提交
486 487
	parser, err := NewBinlogParser(p.ctx, p.collectionSchema, p.shardNum, SingleBlockSize, p.chunkManager, flushFunc,
		tsStartPoint, tsEndPoint)
G
groot 已提交
488 489 490 491 492 493 494 495 496 497 498 499
	if err != nil {
		return err
	}

	err = parser.Parse(filePaths)
	if err != nil {
		return err
	}

	return p.reportPersisted()
}

G
groot 已提交
500
// parseRowBasedJSON is the entry of row-based json import operation
501 502 503
func (p *ImportWrapper) parseRowBasedJSON(filePath string, onlyValidate bool) error {
	tr := timerecord.NewTimeRecorder("json row-based parser: " + filePath)

504 505 506
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

507 508
	// for minio storage, chunkManager will download file into local memory
	// for local storage, chunkManager open the file directly
509
	file, err := p.chunkManager.Reader(ctx, filePath)
510 511 512 513 514 515 516 517 518 519 520 521
	if err != nil {
		return err
	}
	defer file.Close()

	// parse file
	reader := bufio.NewReader(file)
	parser := NewJSONParser(p.ctx, p.collectionSchema)
	var consumer *JSONRowConsumer
	if !onlyValidate {
		flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
			var filePaths = []string{filePath}
G
groot 已提交
522 523
			printFieldsDataInfo(fields, "import wrapper: prepare to flush binlogs", filePaths)
			return p.flushFunc(fields, shardID)
524
		}
G
groot 已提交
525 526

		consumer, err = NewJSONRowConsumer(p.collectionSchema, p.rowIDAllocator, p.shardNum, SingleBlockSize, flushFunc)
527 528 529 530
		if err != nil {
			return err
		}
	}
G
groot 已提交
531

532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
	validator, err := NewJSONRowValidator(p.collectionSchema, consumer)
	if err != nil {
		return err
	}

	err = parser.ParseRows(reader, validator)
	if err != nil {
		return err
	}

	// for row-based files, auto-id is generated within JSONRowConsumer
	if consumer != nil {
		p.importResult.AutoIds = append(p.importResult.AutoIds, consumer.IDRange()...)
	}

	tr.Elapse("parsed")
	return nil
}

G
groot 已提交
551
// parseColumnBasedNumpy is the entry of column-based numpy import operation
552 553 554 555
func (p *ImportWrapper) parseColumnBasedNumpy(filePath string, onlyValidate bool,
	combineFunc func(fields map[storage.FieldID]storage.FieldData) error) error {
	tr := timerecord.NewTimeRecorder("numpy parser: " + filePath)

556 557
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
G
groot 已提交
558
	fileName, _ := GetFileNameAndExt(filePath)
559 560 561

	// for minio storage, chunkManager will download file into local memory
	// for local storage, chunkManager open the file directly
562
	file, err := p.chunkManager.Reader(ctx, filePath)
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598
	if err != nil {
		return err
	}
	defer file.Close()

	var id storage.FieldID
	var found = false
	for _, field := range p.collectionSchema.Fields {
		if field.GetName() == fileName {
			id = field.GetFieldID()
			found = true
			break
		}
	}

	// if the numpy file name is not mapping to a field name, ignore it
	if !found {
		return nil
	}

	// the numpy parser return a storage.FieldData, here construct a map[string]storage.FieldData to combine
	flushFunc := func(field storage.FieldData) error {
		fields := make(map[storage.FieldID]storage.FieldData)
		fields[id] = field
		return combineFunc(fields)
	}

	// for numpy file, we say the file name(without extension) is the filed name
	parser := NewNumpyParser(p.ctx, p.collectionSchema, flushFunc)
	err = parser.Parse(file, fileName, onlyValidate)
	if err != nil {
		return err
	}

	tr.Elapse("parsed")
	return nil
G
groot 已提交
599 600
}

G
groot 已提交
601
// appendFunc defines the methods to append data to storage.FieldData
G
groot 已提交
602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666
func (p *ImportWrapper) appendFunc(schema *schemapb.FieldSchema) func(src storage.FieldData, n int, target storage.FieldData) error {
	switch schema.DataType {
	case schemapb.DataType_Bool:
		return func(src storage.FieldData, n int, target storage.FieldData) error {
			arr := target.(*storage.BoolFieldData)
			arr.Data = append(arr.Data, src.GetRow(n).(bool))
			arr.NumRows[0]++
			return nil
		}
	case schemapb.DataType_Float:
		return func(src storage.FieldData, n int, target storage.FieldData) error {
			arr := target.(*storage.FloatFieldData)
			arr.Data = append(arr.Data, src.GetRow(n).(float32))
			arr.NumRows[0]++
			return nil
		}
	case schemapb.DataType_Double:
		return func(src storage.FieldData, n int, target storage.FieldData) error {
			arr := target.(*storage.DoubleFieldData)
			arr.Data = append(arr.Data, src.GetRow(n).(float64))
			arr.NumRows[0]++
			return nil
		}
	case schemapb.DataType_Int8:
		return func(src storage.FieldData, n int, target storage.FieldData) error {
			arr := target.(*storage.Int8FieldData)
			arr.Data = append(arr.Data, src.GetRow(n).(int8))
			arr.NumRows[0]++
			return nil
		}
	case schemapb.DataType_Int16:
		return func(src storage.FieldData, n int, target storage.FieldData) error {
			arr := target.(*storage.Int16FieldData)
			arr.Data = append(arr.Data, src.GetRow(n).(int16))
			arr.NumRows[0]++
			return nil
		}
	case schemapb.DataType_Int32:
		return func(src storage.FieldData, n int, target storage.FieldData) error {
			arr := target.(*storage.Int32FieldData)
			arr.Data = append(arr.Data, src.GetRow(n).(int32))
			arr.NumRows[0]++
			return nil
		}
	case schemapb.DataType_Int64:
		return func(src storage.FieldData, n int, target storage.FieldData) error {
			arr := target.(*storage.Int64FieldData)
			arr.Data = append(arr.Data, src.GetRow(n).(int64))
			arr.NumRows[0]++
			return nil
		}
	case schemapb.DataType_BinaryVector:
		return func(src storage.FieldData, n int, target storage.FieldData) error {
			arr := target.(*storage.BinaryVectorFieldData)
			arr.Data = append(arr.Data, src.GetRow(n).([]byte)...)
			arr.NumRows[0]++
			return nil
		}
	case schemapb.DataType_FloatVector:
		return func(src storage.FieldData, n int, target storage.FieldData) error {
			arr := target.(*storage.FloatVectorFieldData)
			arr.Data = append(arr.Data, src.GetRow(n).([]float32)...)
			arr.NumRows[0]++
			return nil
		}
G
godchen 已提交
667
	case schemapb.DataType_String, schemapb.DataType_VarChar:
G
groot 已提交
668 669 670 671 672 673 674 675 676 677
		return func(src storage.FieldData, n int, target storage.FieldData) error {
			arr := target.(*storage.StringFieldData)
			arr.Data = append(arr.Data, src.GetRow(n).(string))
			return nil
		}
	default:
		return nil
	}
}

G
groot 已提交
678 679
// splitFieldsData is to split the in-memory data(parsed from column-based files) into blocks, each block save to a binlog file
func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.FieldData, blockSize int64) error {
G
groot 已提交
680
	if len(fieldsData) == 0 {
G
groot 已提交
681
		log.Error("import wrapper: fields data is empty")
G
groot 已提交
682
		return fmt.Errorf("import wrapper: fields data is empty")
G
groot 已提交
683 684
	}

G
groot 已提交
685
	tr := timerecord.NewTimeRecorder("import wrapper: split field data")
G
groot 已提交
686 687
	defer tr.Elapse("finished")

G
groot 已提交
688 689 690 691 692
	// check existence of each field
	// check row count, all fields row count must be equal
	// firstly get the max row count
	rowCount := 0
	rowCounter := make(map[string]int)
G
groot 已提交
693 694 695 696 697
	var primaryKey *schemapb.FieldSchema
	for i := 0; i < len(p.collectionSchema.Fields); i++ {
		schema := p.collectionSchema.Fields[i]
		if schema.GetIsPrimaryKey() {
			primaryKey = schema
G
groot 已提交
698 699 700 701
		}

		if !schema.GetAutoID() {
			v, ok := fieldsData[schema.GetFieldID()]
G
groot 已提交
702
			if !ok {
G
groot 已提交
703
				log.Error("import wrapper: field not provided", zap.String("fieldName", schema.GetName()))
G
groot 已提交
704
				return fmt.Errorf("import wrapper: field '%s' not provided", schema.GetName())
G
groot 已提交
705
			}
G
groot 已提交
706 707 708 709
			rowCounter[schema.GetName()] = v.RowNum()
			if v.RowNum() > rowCount {
				rowCount = v.RowNum()
			}
G
groot 已提交
710 711 712
		}
	}
	if primaryKey == nil {
G
groot 已提交
713
		log.Error("import wrapper: primary key field is not found")
G
groot 已提交
714
		return fmt.Errorf("import wrapper: primary key field is not found")
G
groot 已提交
715 716
	}

G
groot 已提交
717 718
	for name, count := range rowCounter {
		if count != rowCount {
G
groot 已提交
719 720
			log.Error("import wrapper: field row count is not equal to other fields row count", zap.String("fieldName", name),
				zap.Int("rowCount", count), zap.Int("otherRowCount", rowCount))
G
groot 已提交
721
			return fmt.Errorf("import wrapper: field '%s' row count %d is not equal to other fields row count: %d", name, count, rowCount)
G
groot 已提交
722
		}
G
groot 已提交
723
	}
G
groot 已提交
724
	log.Info("import wrapper: try to split a block with row count", zap.Int("rowCount", rowCount), zap.Any("rowCountOfEachField", rowCounter))
G
groot 已提交
725

G
godchen 已提交
726
	primaryData, ok := fieldsData[primaryKey.GetFieldID()]
G
groot 已提交
727
	if !ok {
G
groot 已提交
728
		log.Error("import wrapper: primary key field is not provided", zap.String("keyName", primaryKey.GetName()))
G
groot 已提交
729
		return fmt.Errorf("import wrapper: primary key field is not provided")
G
groot 已提交
730 731
	}

732
	// generate auto id for primary key and rowid field
G
groot 已提交
733 734
	rowIDBegin, rowIDEnd, err := p.rowIDAllocator.Alloc(uint32(rowCount))
	if err != nil {
G
groot 已提交
735
		log.Error("import wrapper: failed to alloc row ID", zap.Error(err))
G
groot 已提交
736 737
		return err
	}
738 739 740 741

	rowIDField := fieldsData[common.RowIDField]
	rowIDFieldArr := rowIDField.(*storage.Int64FieldData)
	for i := rowIDBegin; i < rowIDEnd; i++ {
G
groot 已提交
742
		rowIDFieldArr.Data = append(rowIDFieldArr.Data, i)
743 744
	}

G
groot 已提交
745
	if primaryKey.GetAutoID() {
G
groot 已提交
746
		log.Info("import wrapper: generating auto-id", zap.Int("rowCount", rowCount), zap.Int64("rowIDBegin", rowIDBegin))
G
groot 已提交
747

G
groot 已提交
748 749 750 751 752
		// reset the primary keys, as we know, only int64 pk can be auto-generated
		primaryDataArr := &storage.Int64FieldData{
			NumRows: []int64{int64(rowCount)},
			Data:    make([]int64, 0, rowCount),
		}
G
groot 已提交
753
		for i := rowIDBegin; i < rowIDEnd; i++ {
G
groot 已提交
754
			primaryDataArr.Data = append(primaryDataArr.Data, i)
G
groot 已提交
755
		}
G
groot 已提交
756

G
groot 已提交
757 758
		primaryData = primaryDataArr
		fieldsData[primaryKey.GetFieldID()] = primaryData
G
groot 已提交
759
		p.importResult.AutoIds = append(p.importResult.AutoIds, rowIDBegin, rowIDEnd)
G
groot 已提交
760 761 762
	}

	if primaryData.RowNum() <= 0 {
G
groot 已提交
763
		log.Error("import wrapper: primary key not provided", zap.String("keyName", primaryKey.GetName()))
G
groot 已提交
764
		return fmt.Errorf("import wrapper: the primary key '%s' not provided", primaryKey.GetName())
G
groot 已提交
765 766 767
	}

	// prepare segemnts
G
godchen 已提交
768
	segmentsData := make([]map[storage.FieldID]storage.FieldData, 0, p.shardNum)
G
groot 已提交
769 770 771
	for i := 0; i < int(p.shardNum); i++ {
		segmentData := initSegmentData(p.collectionSchema)
		if segmentData == nil {
G
groot 已提交
772
			log.Error("import wrapper: failed to initialize FieldData list")
G
groot 已提交
773
			return fmt.Errorf("import wrapper: failed to initialize FieldData list")
G
groot 已提交
774 775 776 777 778 779 780 781
		}
		segmentsData = append(segmentsData, segmentData)
	}

	// prepare append functions
	appendFunctions := make(map[string]func(src storage.FieldData, n int, target storage.FieldData) error)
	for i := 0; i < len(p.collectionSchema.Fields); i++ {
		schema := p.collectionSchema.Fields[i]
782 783
		appendFuncErr := p.appendFunc(schema)
		if appendFuncErr == nil {
G
groot 已提交
784
			log.Error("import wrapper: unsupported field data type")
G
groot 已提交
785
			return fmt.Errorf("import wrapper: unsupported field data type")
G
groot 已提交
786
		}
787
		appendFunctions[schema.GetName()] = appendFuncErr
G
groot 已提交
788 789
	}

G
groot 已提交
790
	// split data into shards
G
groot 已提交
791 792
	for i := 0; i < rowCount; i++ {
		// hash to a shard number
793 794 795 796 797 798 799 800 801
		var shard uint32
		pk := primaryData.GetRow(i)
		strPK, ok := interface{}(pk).(string)
		if ok {
			hash := typeutil.HashString2Uint32(strPK)
			shard = hash % uint32(p.shardNum)
		} else {
			intPK, ok := interface{}(pk).(int64)
			if !ok {
G
groot 已提交
802
				log.Error("import wrapper: primary key field must be int64 or varchar")
G
groot 已提交
803
				return fmt.Errorf("import wrapper: primary key field must be int64 or varchar")
804 805 806 807 808 809 810 811
			}
			hash, _ := typeutil.Hash32Int64(intPK)
			shard = hash % uint32(p.shardNum)
		}

		// set rowID field
		rowIDField := segmentsData[shard][common.RowIDField].(*storage.Int64FieldData)
		rowIDField.Data = append(rowIDField.Data, rowIDFieldArr.GetRow(i).(int64))
G
groot 已提交
812

813
		// append row to shard
G
groot 已提交
814 815
		for k := 0; k < len(p.collectionSchema.Fields); k++ {
			schema := p.collectionSchema.Fields[k]
G
godchen 已提交
816 817
			srcData := fieldsData[schema.GetFieldID()]
			targetData := segmentsData[shard][schema.GetFieldID()]
G
groot 已提交
818 819 820 821 822 823
			appendFunc := appendFunctions[schema.GetName()]
			err := appendFunc(srcData, i, targetData)
			if err != nil {
				return err
			}
		}
G
groot 已提交
824 825 826 827 828 829

		// when the estimated size is close to blockSize, force flush
		err = tryFlushBlocks(p.ctx, segmentsData, p.collectionSchema, p.flushFunc, blockSize, MaxTotalSizeInMemory, false)
		if err != nil {
			return err
		}
G
groot 已提交
830 831
	}

G
groot 已提交
832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871
	// force flush at the end
	return tryFlushBlocks(p.ctx, segmentsData, p.collectionSchema, p.flushFunc, blockSize, MaxTotalSizeInMemory, true)
}

// flushFunc is the callback function for parsers generate segment and save binlog files
func (p *ImportWrapper) flushFunc(fields map[storage.FieldID]storage.FieldData, shardID int) error {
	// if fields data is empty, do nothing
	var rowNum int
	memSize := 0
	for _, field := range fields {
		rowNum = field.RowNum()
		memSize += field.GetMemorySize()
		break
	}
	if rowNum <= 0 {
		log.Warn("import wrapper: fields data is empty", zap.Int("shardID", shardID))
		return nil
	}

	// if there is no segment for this shard, create a new one
	// if the segment exists and its size almost exceed segmentSize, close it and create a new one
	var segment *WorkingSegment
	segment, ok := p.workingSegments[shardID]
	if ok {
		// the segment already exists, check its size, if the size exceeds(or almost) segmentSize, close the segment
		if int64(segment.memSize)+int64(memSize) >= p.segmentSize {
			err := p.closeWorkingSegment(segment)
			if err != nil {
				return err
			}
			segment = nil
			p.workingSegments[shardID] = nil
		}

	}

	if segment == nil {
		// create a new segment
		segID, channelName, err := p.assignSegmentFunc(shardID)
		if err != nil {
G
groot 已提交
872
			log.Error("import wrapper: failed to assign a new segment", zap.Error(err), zap.Int("shardID", shardID))
G
groot 已提交
873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890
			return err
		}

		segment = &WorkingSegment{
			segmentID:    segID,
			shardID:      shardID,
			targetChName: channelName,
			rowCount:     int64(0),
			memSize:      0,
			fieldsInsert: make([]*datapb.FieldBinlog, 0),
			fieldsStats:  make([]*datapb.FieldBinlog, 0),
		}
		p.workingSegments[shardID] = segment
	}

	// save binlogs
	fieldsInsert, fieldsStats, err := p.createBinlogsFunc(fields, segment.segmentID)
	if err != nil {
G
groot 已提交
891
		log.Error("import wrapper: failed to save binlogs", zap.Error(err), zap.Int("shardID", shardID),
G
groot 已提交
892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916
			zap.Int64("segmentID", segment.segmentID), zap.String("targetChannel", segment.targetChName))
		return err
	}

	segment.fieldsInsert = append(segment.fieldsInsert, fieldsInsert...)
	segment.fieldsStats = append(segment.fieldsStats, fieldsStats...)
	segment.rowCount += int64(rowNum)
	segment.memSize += memSize

	return nil
}

// closeWorkingSegment marks a segment to be sealed
func (p *ImportWrapper) closeWorkingSegment(segment *WorkingSegment) error {
	log.Info("import wrapper: adding segment to the correct DataNode flow graph and saving binlog paths",
		zap.Int("shardID", segment.shardID),
		zap.Int64("segmentID", segment.segmentID),
		zap.String("targetChannel", segment.targetChName),
		zap.Int64("rowCount", segment.rowCount),
		zap.Int("insertLogCount", len(segment.fieldsInsert)),
		zap.Int("statsLogCount", len(segment.fieldsStats)))

	err := p.saveSegmentFunc(segment.fieldsInsert, segment.fieldsStats, segment.segmentID, segment.targetChName, segment.rowCount)
	if err != nil {
		log.Error("import wrapper: failed to save segment",
G
groot 已提交
917
			zap.Error(err),
G
groot 已提交
918 919 920 921 922 923 924 925 926 927 928 929 930
			zap.Int("shardID", segment.shardID),
			zap.Int64("segmentID", segment.segmentID),
			zap.String("targetChannel", segment.targetChName))
		return err
	}

	return nil
}

// closeAllWorkingSegments mark all segments to be sealed at the end of import operation
func (p *ImportWrapper) closeAllWorkingSegments() error {
	for _, segment := range p.workingSegments {
		err := p.closeWorkingSegment(segment)
G
groot 已提交
931 932 933 934
		if err != nil {
			return err
		}
	}
G
groot 已提交
935
	p.workingSegments = make(map[int]*WorkingSegment)
G
groot 已提交
936 937 938

	return nil
}