未验证 提交 7744573d 编写于 作者: M MrPresent-Han 提交者: GitHub

support parms for import maxfilesize(#24191) (#24192)

Signed-off-by: NMrPresent-Han <chun.han@zilliz.com>
上级 b1afd3ea
...@@ -420,6 +420,9 @@ common: ...@@ -420,6 +420,9 @@ common:
# need to set a separated topic to stand for currently consumed timestamp for each channel # need to set a separated topic to stand for currently consumed timestamp for each channel
timeticker: "timetick-channel" timeticker: "timetick-channel"
ImportMaxFileSize: 17179869184 # 16 * 1024 * 1024 * 1024
# max file size to import for bulkInsert
# QuotaConfig, configurations of Milvus quota and limits. # QuotaConfig, configurations of Milvus quota and limits.
# By default, we enable: # By default, we enable:
# 1. TT protection; # 1. TT protection;
......
...@@ -29,6 +29,7 @@ import ( ...@@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
...@@ -43,11 +44,6 @@ const ( ...@@ -43,11 +44,6 @@ const (
// supposed size of a single block, to control a binlog file size, the max biglog file size is no more than 2*SingleBlockSize // supposed size of a single block, to control a binlog file size, the max biglog file size is no more than 2*SingleBlockSize
SingleBlockSize = 16 * 1024 * 1024 // 16MB SingleBlockSize = 16 * 1024 * 1024 // 16MB
// this limitation is to avoid this OOM risk:
// for column-based file, we read all its data into memory, if user input a large file, the read() method may
// cost extra memory and lear to OOM.
MaxFileSize = 16 * 1024 * 1024 * 1024 // 16GB
// this limitation is to avoid this OOM risk: // this limitation is to avoid this OOM risk:
// simetimes system segment max size is a large number, a single segment fields data might cause OOM. // simetimes system segment max size is a large number, a single segment fields data might cause OOM.
// flush the segment when its data reach this limitation, let the compaction to compact it later. // flush the segment when its data reach this limitation, let the compaction to compact it later.
...@@ -239,10 +235,10 @@ func (p *ImportWrapper) fileValidation(filePaths []string) (bool, error) { ...@@ -239,10 +235,10 @@ func (p *ImportWrapper) fileValidation(filePaths []string) (bool, error) {
return rowBased, fmt.Errorf("the file '%s' size is zero", filePath) return rowBased, fmt.Errorf("the file '%s' size is zero", filePath)
} }
if size > MaxFileSize { if size > params.Params.CommonCfg.ImportMaxFileSize.GetAsInt64() {
log.Error("import wrapper: file size exceeds the maximum size", zap.String("filePath", filePath), log.Error("import wrapper: file size exceeds the maximum size", zap.String("filePath", filePath),
zap.Int64("fileSize", size), zap.Int64("MaxFileSize", MaxFileSize)) zap.Int64("fileSize", size), zap.Int64("MaxFileSize", params.Params.CommonCfg.ImportMaxFileSize.GetAsInt64()))
return rowBased, fmt.Errorf("the file '%s' size exceeds the maximum size: %d bytes", filePath, MaxFileSize) return rowBased, fmt.Errorf("the file '%s' size exceeds the maximum size: %d bytes", filePath, params.Params.CommonCfg.ImportMaxFileSize.GetAsInt64())
} }
totalSize += size totalSize += size
} }
......
...@@ -37,6 +37,7 @@ import ( ...@@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/timerecord"
...@@ -228,6 +229,7 @@ func Test_ImportWrapperRowBased(t *testing.T) { ...@@ -228,6 +229,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
err := os.MkdirAll(TempFilesPath, os.ModePerm) err := os.MkdirAll(TempFilesPath, os.ModePerm)
assert.Nil(t, err) assert.Nil(t, err)
defer os.RemoveAll(TempFilesPath) defer os.RemoveAll(TempFilesPath)
params.Params.Init()
// NewDefaultFactory() use "/tmp/milvus" as default root path, and cannot specify root path // NewDefaultFactory() use "/tmp/milvus" as default root path, and cannot specify root path
// NewChunkManagerFactory() can specify the root path // NewChunkManagerFactory() can specify the root path
...@@ -618,7 +620,7 @@ func Test_ImportWrapperFileValidation(t *testing.T) { ...@@ -618,7 +620,7 @@ func Test_ImportWrapperFileValidation(t *testing.T) {
segmentSize := 512 // unit: MB segmentSize := 512 // unit: MB
wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, nil, nil) wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, nil, nil)
params.Params.Init()
t.Run("unsupported file type", func(t *testing.T) { t.Run("unsupported file type", func(t *testing.T) {
files := []string{"uid.txt"} files := []string{"uid.txt"}
rowBased, err := wrapper.fileValidation(files) rowBased, err := wrapper.fileValidation(files)
...@@ -675,7 +677,7 @@ func Test_ImportWrapperFileValidation(t *testing.T) { ...@@ -675,7 +677,7 @@ func Test_ImportWrapperFileValidation(t *testing.T) {
t.Run("file size exceed MaxFileSize limit", func(t *testing.T) { t.Run("file size exceed MaxFileSize limit", func(t *testing.T) {
files := []string{"a/1.json"} files := []string{"a/1.json"}
cm.size = MaxFileSize + 1 cm.size = params.Params.CommonCfg.ImportMaxFileSize.GetAsInt64() + 1
wrapper = NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, nil, nil) wrapper = NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, nil, nil)
rowBased, err := wrapper.fileValidation(files) rowBased, err := wrapper.fileValidation(files)
assert.NotNil(t, err) assert.NotNil(t, err)
......
...@@ -117,7 +117,7 @@ func (p *NumpyParser) IDRange() []int64 { ...@@ -117,7 +117,7 @@ func (p *NumpyParser) IDRange() []int64 {
// Parse is the function entry // Parse is the function entry
func (p *NumpyParser) Parse(filePaths []string) error { func (p *NumpyParser) Parse(filePaths []string) error {
// check redundant files for column-based import // check redundant files for column-based import
// if the field is primary key and autoid is false, the file is required // if the field is primary key and autoID is false, the file is required
// any redundant file is not allowed // any redundant file is not allowed
err := p.validateFileNames(filePaths) err := p.validateFileNames(filePaths)
if err != nil { if err != nil {
...@@ -126,7 +126,7 @@ func (p *NumpyParser) Parse(filePaths []string) error { ...@@ -126,7 +126,7 @@ func (p *NumpyParser) Parse(filePaths []string) error {
// open files and verify file header // open files and verify file header
readers, err := p.createReaders(filePaths) readers, err := p.createReaders(filePaths)
// make sure all the files are closed finially, must call this method before the function return // make sure all the files are closed finally, must call this method before the function return
defer closeReaders(readers) defer closeReaders(readers)
if err != nil { if err != nil {
return err return err
......
...@@ -216,6 +216,8 @@ type commonConfig struct { ...@@ -216,6 +216,8 @@ type commonConfig struct {
TimeTicker ParamItem `refreshable:"true"` TimeTicker ParamItem `refreshable:"true"`
JSONMaxLength ParamItem `refreshable:"false"` JSONMaxLength ParamItem `refreshable:"false"`
ImportMaxFileSize ParamItem `refreshable:"true"`
} }
func (p *commonConfig) init(base *BaseTable) { func (p *commonConfig) init(base *BaseTable) {
...@@ -643,6 +645,13 @@ like the old password verification when updating the credential`, ...@@ -643,6 +645,13 @@ like the old password verification when updating the credential`,
DefaultValue: fmt.Sprint(64 << 10), DefaultValue: fmt.Sprint(64 << 10),
} }
p.JSONMaxLength.Init(base.mgr) p.JSONMaxLength.Init(base.mgr)
p.ImportMaxFileSize = ParamItem{
Key: "common.ImportMaxFileSize",
Version: "2.2.9",
DefaultValue: fmt.Sprint(16 << 30),
}
p.ImportMaxFileSize.Init(base.mgr)
} }
type traceConfig struct { type traceConfig struct {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册