未验证 提交 a327e987 编写于 作者: M MrPresent-Han 提交者: GitHub

support parms for import maxfilesize(#24191) (#24194)

Signed-off-by: NMrPresent-Han <chun.han@zilliz.com>
上级 39d62f4e
......@@ -460,6 +460,9 @@ common:
ttl: 20 # ttl value when session granting a lease to register service
retryTimes: 30 # retry times when session sending etcd requests
ImportMaxFileSize: 17179869184 # 16 * 1024 * 1024 * 1024
# max file size to import for bulkInsert
# QuotaConfig, configurations of Milvus quota and limits.
# By default, we enable:
# 1. TT protection;
......
......@@ -39,6 +39,7 @@ import (
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
......@@ -82,6 +83,7 @@ func TestDataNode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
params.Params.Init()
node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
......@@ -102,6 +104,7 @@ func TestDataNode(t *testing.T) {
node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/milvus_test/datanode"))
Params.DataNodeCfg.SetNodeID(1)
t.Run("Test WatchDmChannels ", func(t *testing.T) {
emptyNode := &DataNode{}
......
......@@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/timerecord"
......@@ -43,11 +44,6 @@ const (
// supposed size of a single block, to control a binlog file size, the max biglog file size is no more than 2*SingleBlockSize
SingleBlockSize = 16 * 1024 * 1024 // 16MB
// this limitation is to avoid this OOM risk:
// for column-based file, we read all its data into memory, if user input a large file, the read() method may
// cost extra memory and lear to OOM.
MaxFileSize = 16 * 1024 * 1024 * 1024 // 16GB
// this limitation is to avoid this OOM risk:
// simetimes system segment max size is a large number, a single segment fields data might cause OOM.
// flush the segment when its data reach this limitation, let the compaction to compact it later.
......@@ -238,10 +234,10 @@ func (p *ImportWrapper) fileValidation(filePaths []string) (bool, error) {
return rowBased, fmt.Errorf("the file '%s' size is zero", filePath)
}
if size > MaxFileSize {
if size > params.Params.CommonCfg.ImportMaxFileSize {
log.Error("import wrapper: file size exceeds the maximum size", zap.String("filePath", filePath),
zap.Int64("fileSize", size), zap.Int64("MaxFileSize", MaxFileSize))
return rowBased, fmt.Errorf("the file '%s' size exceeds the maximum size: %d bytes", filePath, MaxFileSize)
zap.Int64("fileSize", size), zap.Int64("MaxFileSize", params.Params.CommonCfg.ImportMaxFileSize))
return rowBased, fmt.Errorf("the file '%s' size exceeds the maximum size: %d bytes", filePath, params.Params.CommonCfg.ImportMaxFileSize)
}
totalSize += size
}
......
......@@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/timerecord"
)
......@@ -227,6 +228,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
err := os.MkdirAll(TempFilesPath, os.ModePerm)
assert.Nil(t, err)
defer os.RemoveAll(TempFilesPath)
params.Params.Init()
// NewDefaultFactory() use "/tmp/milvus" as default root path, and cannot specify root path
// NewChunkManagerFactory() can specify the root path
......@@ -391,6 +393,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
err := os.MkdirAll(TempFilesPath, os.ModePerm)
assert.Nil(t, err)
defer os.RemoveAll(TempFilesPath)
params.Params.Init()
// NewDefaultFactory() use "/tmp/milvus" as default root path, and cannot specify root path
// NewChunkManagerFactory() can specify the root path
......@@ -487,6 +490,7 @@ func Test_ImportWrapperRowBased_perf(t *testing.T) {
err := os.MkdirAll(TempFilesPath, os.ModePerm)
assert.Nil(t, err)
defer os.RemoveAll(TempFilesPath)
params.Params.Init()
// NewDefaultFactory() use "/tmp/milvus" as default root path, and cannot specify root path
// NewChunkManagerFactory() can specify the root path
......@@ -581,6 +585,7 @@ func Test_ImportWrapperRowBased_perf(t *testing.T) {
func Test_ImportWrapperFileValidation(t *testing.T) {
ctx := context.Background()
params.Params.Init()
cm := &MockChunkManager{
size: 1,
......@@ -667,7 +672,7 @@ func Test_ImportWrapperFileValidation(t *testing.T) {
t.Run("file size exceed MaxFileSize limit", func(t *testing.T) {
files := []string{"a/1.json"}
cm.size = MaxFileSize + 1
cm.size = params.Params.CommonCfg.ImportMaxFileSize + 1
wrapper = NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, nil, nil)
rowBased, err := wrapper.fileValidation(files)
assert.NotNil(t, err)
......@@ -687,6 +692,7 @@ func Test_ImportWrapperReportFailRowBased(t *testing.T) {
err := os.MkdirAll(TempFilesPath, os.ModePerm)
assert.Nil(t, err)
defer os.RemoveAll(TempFilesPath)
params.Params.Init()
// NewDefaultFactory() use "/tmp/milvus" as default root path, and cannot specify root path
// NewChunkManagerFactory() can specify the root path
......@@ -748,6 +754,7 @@ func Test_ImportWrapperReportFailColumnBased_numpy(t *testing.T) {
err := os.MkdirAll(TempFilesPath, os.ModePerm)
assert.Nil(t, err)
defer os.RemoveAll(TempFilesPath)
params.Params.Init()
// NewDefaultFactory() use "/tmp/milvus" as default root path, and cannot specify root path
// NewChunkManagerFactory() can specify the root path
......@@ -798,6 +805,7 @@ func Test_ImportWrapperIsBinlogImport(t *testing.T) {
err := os.MkdirAll(TempFilesPath, os.ModePerm)
assert.Nil(t, err)
defer os.RemoveAll(TempFilesPath)
params.Params.Init()
// NewDefaultFactory() use "/tmp/milvus" as default root path, and cannot specify root path
// NewChunkManagerFactory() can specify the root path
......@@ -863,6 +871,7 @@ func Test_ImportWrapperIsBinlogImport(t *testing.T) {
func Test_ImportWrapperDoBinlogImport(t *testing.T) {
ctx := context.Background()
params.Params.Init()
cm := &MockChunkManager{
size: 1,
......@@ -923,6 +932,7 @@ func Test_ImportWrapperDoBinlogImport(t *testing.T) {
func Test_ImportWrapperReportPersisted(t *testing.T) {
ctx := context.Background()
tr := timerecord.NewTimeRecorder("test")
params.Params.Init()
importResult := &rootcoordpb.ImportResult{
Status: &commonpb.Status{
......@@ -972,6 +982,7 @@ func Test_ImportWrapperReportPersisted(t *testing.T) {
func Test_ImportWrapperUpdateProgressPercent(t *testing.T) {
ctx := context.Background()
params.Params.Init()
wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, nil, nil, nil, nil)
assert.NotNil(t, wrapper)
......
......@@ -49,7 +49,8 @@ const (
DefaultLoadNumThreadRatio = 8.0
DefaultBeamWidthRatio = 4.0
DefaultGrpcRetryTimes = 5
DefaultGrpcRetryTimes = 5
DefaultImportMaxFileSize = 16 * 1024 * 1024 * 1024
)
// ComponentParam is used to quickly and easily access all components' configurations.
......@@ -175,7 +176,8 @@ type commonConfig struct {
SessionTTL int64
SessionRetryTimes int64
GrpcRetryTimes uint
GrpcRetryTimes uint
ImportMaxFileSize int64
}
func (p *commonConfig) init(base *BaseTable) {
......@@ -230,6 +232,7 @@ func (p *commonConfig) init(base *BaseTable) {
p.initSessionTTL()
p.initSessionRetryTimes()
p.initGrpcRetryTimes()
p.initImportMaxFileSize()
}
func (p *commonConfig) initClusterPrefix() {
......@@ -498,6 +501,10 @@ func (p *commonConfig) initGrpcRetryTimes() {
p.GrpcRetryTimes = uint(p.Base.ParseIntWithDefault("grpc.server.retryTimes", DefaultGrpcRetryTimes))
}
func (p *commonConfig) initImportMaxFileSize() {
p.ImportMaxFileSize = p.Base.ParseInt64WithDefault("common.ImportMaxFileSize", DefaultImportMaxFileSize)
}
// /////////////////////////////////////////////////////////////////////////////
// --- rootcoord ---
type rootCoordConfig struct {
......
......@@ -139,6 +139,11 @@ func TestComponentParam(t *testing.T) {
Params.initGrpcRetryTimes()
assert.Equal(t, Params.GrpcRetryTimes, uint(6))
assert.Equal(t, Params.ImportMaxFileSize, int64(DefaultImportMaxFileSize))
Params.Base.Save("common.ImportMaxFileSize", "600")
Params.initImportMaxFileSize()
assert.Equal(t, int64(600), Params.ImportMaxFileSize)
Params.Base.Save("common.security.superUsers", "super1,super2,super3")
Params.initSuperUsers()
assert.Equal(t, []string{"super1", "super2", "super3"}, Params.SuperUsers)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册