未验证 提交 9d8f422c 编写于 作者: T Ten Thousand Leaves 提交者: GitHub

Reduce maxFieldNum from 256 to 64 (#21104)

issue: #21077

/kind improvement
Signed-off-by: NYuchen Gao <yuchen.gao@zilliz.com>
Signed-off-by: NYuchen Gao <yuchen.gao@zilliz.com>
上级 f31d5fac
...@@ -157,7 +157,9 @@ proxy: ...@@ -157,7 +157,9 @@ proxy:
timeTick: timeTick:
bufSize: 512 bufSize: 512
maxNameLength: 255 # Maximum length of name for a collection or alias maxNameLength: 255 # Maximum length of name for a collection or alias
maxFieldNum: 256 # Maximum number of fields in a collection maxFieldNum: 64 # Maximum number of fields in a collection.
# As of today (2.2.0 and after) it is strongly DISCOURAGED to set maxFieldNum >= 64.
# So adjust at your risk!
maxDimension: 32768 # Maximum dimension of a vector maxDimension: 32768 # Maximum dimension of a vector
maxShardNum: 256 # Maximum number of shards in a collection maxShardNum: 256 # Maximum number of shards in a collection
maxTaskNum: 1024 # max task number of proxy task queue maxTaskNum: 1024 # max task number of proxy task queue
......
...@@ -38,7 +38,7 @@ import ( ...@@ -38,7 +38,7 @@ import (
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
) )
const maxEtcdTxnNum = 64 var maxEtcdTxnNum = 128
type Catalog struct { type Catalog struct {
Txn kv.TxnKV Txn kv.TxnKV
...@@ -150,6 +150,10 @@ func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.Segm ...@@ -150,6 +150,10 @@ func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.Segm
} }
maps.Copy(kvsPiece, kvs) maps.Copy(kvsPiece, kvs)
currSize += len(kvs) currSize += len(kvs)
if len(kvs) >= maxEtcdTxnNum {
log.Warn("a single segment's Etcd save has over maxEtcdTxnNum operations." +
" Please double check your <proxy.maxFieldNum> config")
}
} }
if currSize > 0 { if currSize > 0 {
if err := etcd.SaveByBatch(kvsPiece, saveFn); err != nil { if err := etcd.SaveByBatch(kvsPiece, saveFn); err != nil {
...@@ -643,10 +647,12 @@ func buildFieldBinlogPath(collectionID typeutil.UniqueID, partitionID typeutil.U ...@@ -643,10 +647,12 @@ func buildFieldBinlogPath(collectionID typeutil.UniqueID, partitionID typeutil.U
return fmt.Sprintf("%s/%d/%d/%d/%d", SegmentBinlogPathPrefix, collectionID, partitionID, segmentID, fieldID) return fmt.Sprintf("%s/%d/%d/%d/%d", SegmentBinlogPathPrefix, collectionID, partitionID, segmentID, fieldID)
} }
// TODO: There's no need to include fieldID in the delta log path key.
func buildFieldDeltalogPath(collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, segmentID typeutil.UniqueID, fieldID typeutil.UniqueID) string { func buildFieldDeltalogPath(collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, segmentID typeutil.UniqueID, fieldID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d/%d", SegmentDeltalogPathPrefix, collectionID, partitionID, segmentID, fieldID) return fmt.Sprintf("%s/%d/%d/%d/%d", SegmentDeltalogPathPrefix, collectionID, partitionID, segmentID, fieldID)
} }
// TODO: There's no need to include fieldID in the stats log path key.
func buildFieldStatslogPath(collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, segmentID typeutil.UniqueID, fieldID typeutil.UniqueID) string { func buildFieldStatslogPath(collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, segmentID typeutil.UniqueID, fieldID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d/%d", SegmentStatslogPathPrefix, collectionID, partitionID, segmentID, fieldID) return fmt.Sprintf("%s/%d/%d/%d/%d", SegmentStatslogPathPrefix, collectionID, partitionID, segmentID, fieldID)
} }
......
...@@ -423,7 +423,7 @@ func Test_AlterSegments(t *testing.T) { ...@@ -423,7 +423,7 @@ func Test_AlterSegments(t *testing.T) {
err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segmentXL}) err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segmentXL})
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 255+3, len(savedKvs)) assert.Equal(t, 255+3, len(savedKvs))
assert.Equal(t, 5, opGroupCount) assert.Equal(t, 3, opGroupCount)
adjustedSeg, err := catalog.LoadFromSegmentPath(segmentXL.CollectionID, segmentXL.PartitionID, segmentXL.ID) adjustedSeg, err := catalog.LoadFromSegmentPath(segmentXL.CollectionID, segmentXL.PartitionID, segmentXL.ID)
assert.NoError(t, err) assert.NoError(t, err)
...@@ -587,8 +587,8 @@ func Test_SaveDroppedSegmentsInBatch_MultiSave(t *testing.T) { ...@@ -587,8 +587,8 @@ func Test_SaveDroppedSegmentsInBatch_MultiSave(t *testing.T) {
// testing for reaching max operation // testing for reaching max operation
{ {
segments2 := make([]*datapb.SegmentInfo, 65) segments2 := make([]*datapb.SegmentInfo, 129)
for i := 0; i < 65; i++ { for i := 0; i < 129; i++ {
segments2[i] = &datapb.SegmentInfo{ segments2[i] = &datapb.SegmentInfo{
ID: int64(i), ID: int64(i),
CollectionID: 1000, CollectionID: 1000,
...@@ -601,7 +601,7 @@ func Test_SaveDroppedSegmentsInBatch_MultiSave(t *testing.T) { ...@@ -601,7 +601,7 @@ func Test_SaveDroppedSegmentsInBatch_MultiSave(t *testing.T) {
err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments2) err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments2)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 2, count) assert.Equal(t, 2, count)
assert.Equal(t, 65, kvSize) assert.Equal(t, 129, kvSize)
} }
} }
......
...@@ -72,7 +72,7 @@ func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, maxTxnNum int, sa ...@@ -72,7 +72,7 @@ func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, maxTxnNum int, sa
saveFn := func(partialKvs map[string]string) error { saveFn := func(partialKvs map[string]string) error {
return snapshot.MultiSave(partialKvs, ts) return snapshot.MultiSave(partialKvs, ts)
} }
if err := etcd.SaveByBatch(saves, saveFn); err != nil { if err := etcd.SaveByBatchWithLimit(saves, maxTxnNum/2, saveFn); err != nil {
return err return err
} }
...@@ -132,7 +132,7 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, ...@@ -132,7 +132,7 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection,
// Though batchSave is not atomic enough, we can promise the atomicity outside. // Though batchSave is not atomic enough, we can promise the atomicity outside.
// Recovering from failure, if we found collection is creating, we should removing all these related meta. // Recovering from failure, if we found collection is creating, we should removing all these related meta.
return etcd.SaveByBatch(kvs, func(partialKvs map[string]string) error { return etcd.SaveByBatchWithLimit(kvs, maxTxnNum/2, func(partialKvs map[string]string) error {
return kc.Snapshot.MultiSave(partialKvs, ts) return kc.Snapshot.MultiSave(partialKvs, ts)
}) })
} }
......
...@@ -32,7 +32,7 @@ import ( ...@@ -32,7 +32,7 @@ import (
) )
var ( var (
maxTxnNum = 64 maxTxnNum = 128
) )
// GetEtcdClient returns etcd client // GetEtcdClient returns etcd client
...@@ -110,8 +110,8 @@ func min(a, b int) int { ...@@ -110,8 +110,8 @@ func min(a, b int) int {
return b return b
} }
//SaveByBatch there will not guarantee atomicity // SaveByBatchWithLimit is SaveByBatch with customized limit.
func SaveByBatch(kvs map[string]string, op func(partialKvs map[string]string) error) error { func SaveByBatchWithLimit(kvs map[string]string, limit int, op func(partialKvs map[string]string) error) error {
if len(kvs) == 0 { if len(kvs) == 0 {
return nil return nil
} }
...@@ -124,8 +124,8 @@ func SaveByBatch(kvs map[string]string, op func(partialKvs map[string]string) er ...@@ -124,8 +124,8 @@ func SaveByBatch(kvs map[string]string, op func(partialKvs map[string]string) er
values = append(values, v) values = append(values, v)
} }
for i := 0; i < len(kvs); i = i + maxTxnNum { for i := 0; i < len(kvs); i = i + limit {
end := min(i+maxTxnNum, len(keys)) end := min(i+limit, len(keys))
batch, err := buildKvGroup(keys[i:end], values[i:end]) batch, err := buildKvGroup(keys[i:end], values[i:end])
if err != nil { if err != nil {
return err return err
...@@ -138,6 +138,11 @@ func SaveByBatch(kvs map[string]string, op func(partialKvs map[string]string) er ...@@ -138,6 +138,11 @@ func SaveByBatch(kvs map[string]string, op func(partialKvs map[string]string) er
return nil return nil
} }
// SaveByBatch there will not guarantee atomicity.
func SaveByBatch(kvs map[string]string, op func(partialKvs map[string]string) error) error {
return SaveByBatchWithLimit(kvs, maxTxnNum, op)
}
func RemoveByBatch(removals []string, op func(partialKeys []string) error) error { func RemoveByBatch(removals []string, op func(partialKeys []string) error) error {
if len(removals) == 0 { if len(removals) == 0 {
return nil return nil
......
...@@ -57,7 +57,7 @@ compact_segment_num_threshold = 3 ...@@ -57,7 +57,7 @@ compact_segment_num_threshold = 3
compact_delta_ratio_reciprocal = 5 # compact_delta_binlog_ratio is 0.2 compact_delta_ratio_reciprocal = 5 # compact_delta_binlog_ratio is 0.2
compact_retention_duration = 40 # compaction travel time retention range 20s compact_retention_duration = 40 # compaction travel time retention range 20s
max_compaction_interval = 60 # the max time interval (s) from the last compaction max_compaction_interval = 60 # the max time interval (s) from the last compaction
max_field_num = 256 # Maximum number of fields in a collection max_field_num = 64 # Maximum number of fields in a collection
default_replica_num = 1 default_replica_num = 1
IMAGE_REPOSITORY_MILVUS = "harbor.milvus.io/dockerhub/milvusdb/milvus" IMAGE_REPOSITORY_MILVUS = "harbor.milvus.io/dockerhub/milvusdb/milvus"
NAMESPACE_CHAOS_TESTING = "chaos-testing" NAMESPACE_CHAOS_TESTING = "chaos-testing"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册