diff --git a/configs/advanced/master.yaml b/configs/advanced/master.yaml index 4c077af88679b28e62a3ad0b57250cbbe272bd4a..e0837042d60e13adb5e2c1dae7732d7e98b6ca15 100644 --- a/configs/advanced/master.yaml +++ b/configs/advanced/master.yaml @@ -20,7 +20,4 @@ master: minIDAssignCnt: 1024 maxIDAssignCnt: 16384 # old name: segmentExpireDuration: 2000 - IDAssignExpiration: 2000 # ms - - maxPartitionNum: 4096 - defaultPartitionTag: _default \ No newline at end of file + IDAssignExpiration: 2000 # ms \ No newline at end of file diff --git a/configs/advanced/proxy.yaml b/configs/advanced/proxy.yaml index 0d3163c0d1122dfe1e7998f33aba0b403f7e3b27..cc98ad4d85ed10ec7ddbc517d35eeae1eed0528f 100644 --- a/configs/advanced/proxy.yaml +++ b/configs/advanced/proxy.yaml @@ -28,4 +28,3 @@ proxy: bufSize: 512 maxNameLength: 255 - maxFieldNum: 64 \ No newline at end of file diff --git a/internal/master/meta_table.go b/internal/master/meta_table.go index 991828a9cb24897b5bb83b9b02dd88fad871d8cf..7a46f6e254159b4a94a0d26c6382c6a8941a4682 100644 --- a/internal/master/meta_table.go +++ b/internal/master/meta_table.go @@ -52,7 +52,7 @@ func (mt *metaTable) reloadFromKV() error { for _, value := range values { tenantMeta := pb.TenantMeta{} - err := proto.Unmarshal([]byte(value), &tenantMeta) + err := proto.UnmarshalText(value, &tenantMeta) if err != nil { return err } @@ -66,7 +66,7 @@ func (mt *metaTable) reloadFromKV() error { for _, value := range values { proxyMeta := pb.ProxyMeta{} - err = proto.Unmarshal([]byte(value), &proxyMeta) + err = proto.UnmarshalText(value, &proxyMeta) if err != nil { return err } @@ -80,7 +80,7 @@ func (mt *metaTable) reloadFromKV() error { for _, value := range values { collectionMeta := pb.CollectionMeta{} - err = proto.Unmarshal([]byte(value), &collectionMeta) + err = proto.UnmarshalText(value, &collectionMeta) if err != nil { return err } @@ -95,7 +95,7 @@ func (mt *metaTable) reloadFromKV() error { for _, value := range values { segmentMeta := pb.SegmentMeta{} - err = proto.Unmarshal([]byte(value), &segmentMeta) + err = proto.UnmarshalText(value, &segmentMeta) if err != nil { return err } @@ -107,10 +107,7 @@ func (mt *metaTable) reloadFromKV() error { // metaTable.ddLock.Lock() before call this function func (mt *metaTable) saveCollectionMeta(coll *pb.CollectionMeta) error { - collBytes, err := proto.Marshal(coll) - if err != nil { - return err - } + collBytes := proto.MarshalTextString(coll) mt.collID2Meta[coll.ID] = *coll mt.collName2ID[coll.Schema.Name] = coll.ID return mt.client.Save("/collection/"+strconv.FormatInt(coll.ID, 10), string(collBytes)) @@ -118,10 +115,7 @@ func (mt *metaTable) saveCollectionMeta(coll *pb.CollectionMeta) error { // metaTable.ddLock.Lock() before call this function func (mt *metaTable) saveSegmentMeta(seg *pb.SegmentMeta) error { - segBytes, err := proto.Marshal(seg) - if err != nil { - return err - } + segBytes := proto.MarshalTextString(seg) mt.segID2Meta[seg.SegmentID] = *seg @@ -136,10 +130,7 @@ func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta } kvs := make(map[string]string) - collStrs, err := proto.Marshal(coll) - if err != nil { - return err - } + collStrs := proto.MarshalTextString(coll) kvs["/collection/"+strconv.FormatInt(coll.ID, 10)] = string(collStrs) @@ -159,19 +150,15 @@ func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta // metaTable.ddLock.Lock() before call this function func (mt *metaTable) saveCollectionsAndSegmentsMeta(coll *pb.CollectionMeta, seg *pb.SegmentMeta) error { kvs := make(map[string]string) - collBytes, err := proto.Marshal(coll) - if err != nil { - return err - } + collBytes := proto.MarshalTextString(coll) + kvs["/collection/"+strconv.FormatInt(coll.ID, 10)] = string(collBytes) mt.collID2Meta[coll.ID] = *coll mt.collName2ID[coll.Schema.Name] = coll.ID - segBytes, err := proto.Marshal(seg) - if err != nil { - return err - } + segBytes := proto.MarshalTextString(seg) + kvs["/segment/"+strconv.FormatInt(seg.SegmentID, 10)] = string(segBytes) mt.segID2Meta[seg.SegmentID] = *seg @@ -220,7 +207,7 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionMeta) error { } if len(coll.PartitionTags) == 0 { - coll.PartitionTags = append(coll.PartitionTags, Params.DefaultPartitionTag) + coll.PartitionTags = append(coll.PartitionTags, "default") } _, ok := mt.collName2ID[coll.Schema.Name] if ok { @@ -292,10 +279,6 @@ func (mt *metaTable) AddPartition(collID UniqueID, tag string) error { return errors.Errorf("can't find collection. id = " + strconv.FormatInt(collID, 10)) } - // number of partition tags (except _default) should be limited to 4096 by default - if int64(len(coll.PartitionTags)) > Params.MaxPartitionNum { - return errors.New("maximum partition's number should be limit to " + strconv.FormatInt(Params.MaxPartitionNum, 10)) - } for _, t := range coll.PartitionTags { if t == tag { return errors.Errorf("partition already exists.") @@ -330,29 +313,17 @@ func (mt *metaTable) DeletePartition(collID UniqueID, tag string) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() - if tag == Params.DefaultPartitionTag { - return errors.New("default partition cannot be deleted") - } - collMeta, ok := mt.collID2Meta[collID] if !ok { return errors.Errorf("can't find collection. id = " + strconv.FormatInt(collID, 10)) } - // check tag exists - exist := false - pt := make([]string, 0, len(collMeta.PartitionTags)) for _, t := range collMeta.PartitionTags { if t != tag { pt = append(pt, t) - } else { - exist = true } } - if !exist { - return errors.New("partition " + tag + " does not exist") - } if len(pt) == len(collMeta.PartitionTags) { return nil } diff --git a/internal/master/meta_table_test.go b/internal/master/meta_table_test.go index 13482124d0bb53e311544abb22cdbabd7973a8fe..674b0162594bbf3c5a1e5dcfccc02435feb53dfb 100644 --- a/internal/master/meta_table_test.go +++ b/internal/master/meta_table_test.go @@ -3,7 +3,6 @@ package master import ( "context" "reflect" - "strconv" "testing" "github.com/stretchr/testify/assert" @@ -239,10 +238,6 @@ func TestMetaTable_DeletePartition(t *testing.T) { assert.Equal(t, 1, len(meta.collName2ID)) assert.Equal(t, 1, len(meta.collID2Meta)) assert.Equal(t, 1, len(meta.segID2Meta)) - - // delete not exist - err = meta.DeletePartition(100, "not_exist") - assert.NotNil(t, err) } func TestMetaTable_Segment(t *testing.T) { @@ -371,39 +366,3 @@ func TestMetaTable_UpdateSegment(t *testing.T) { assert.Nil(t, err) assert.Equal(t, seg.NumRows, int64(210)) } - -func TestMetaTable_AddPartition_Limit(t *testing.T) { - Init() - Params.MaxPartitionNum = 256 // adding 4096 partitions is too slow - etcdAddr := Params.EtcdAddress - - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) - assert.Nil(t, err) - etcdKV := kv.NewEtcdKV(cli, "/etcd/test/root") - - _, err = cli.Delete(context.TODO(), "/etcd/test/root", clientv3.WithPrefix()) - assert.Nil(t, err) - - meta, err := NewMetaTable(etcdKV) - assert.Nil(t, err) - defer meta.client.Close() - - colMeta := pb.CollectionMeta{ - ID: 100, - Schema: &schemapb.CollectionSchema{ - Name: "coll1", - }, - CreateTime: 0, - SegmentIDs: []UniqueID{}, - PartitionTags: []string{}, - } - err = meta.AddCollection(&colMeta) - assert.Nil(t, err) - - for i := 0; i < int(Params.MaxPartitionNum); i++ { - err := meta.AddPartition(100, "partition_"+strconv.Itoa(i)) - assert.Nil(t, err) - } - err = meta.AddPartition(100, "partition_limit") - assert.NotNil(t, err) -} diff --git a/internal/master/param_table.go b/internal/master/param_table.go index 2d32f2e769bbe11ff094e28d88de6a7287b3906b..872bdab34eaf5d12f66e20a101cf24ee3fe6f561 100644 --- a/internal/master/param_table.go +++ b/internal/master/param_table.go @@ -43,9 +43,6 @@ type ParamTable struct { K2SChannelNames []string QueryNodeStatsChannelName string MsgChannelSubName string - - MaxPartitionNum int64 - DefaultPartitionTag string } var Params ParamTable @@ -94,8 +91,6 @@ func (p *ParamTable) Init() { p.initK2SChannelNames() p.initQueryNodeStatsChannelName() p.initMsgChannelSubName() - p.initMaxPartitionNum() - p.initDefaultPartitionTag() } func (p *ParamTable) initAddress() { @@ -416,24 +411,3 @@ func (p *ParamTable) initK2SChannelNames() { } p.K2SChannelNames = channels } - -func (p *ParamTable) initMaxPartitionNum() { - str, err := p.Load("master.maxPartitionNum") - if err != nil { - panic(err) - } - maxPartitionNum, err := strconv.ParseInt(str, 10, 64) - if err != nil { - panic(err) - } - p.MaxPartitionNum = maxPartitionNum -} - -func (p *ParamTable) initDefaultPartitionTag() { - defaultTag, err := p.Load("master.defaultPartitionTag") - if err != nil { - panic(err) - } - - p.DefaultPartitionTag = defaultTag -} diff --git a/internal/master/partition_task.go b/internal/master/partition_task.go index 80a339b2fcc42601d43a03729dd31630c507af2b..28636c930c744e71b902721485d533cb4577e73c 100644 --- a/internal/master/partition_task.go +++ b/internal/master/partition_task.go @@ -191,12 +191,10 @@ func (t *showPartitionTask) Execute() error { return errors.New("null request") } - collMeta, err := t.mt.GetCollectionByName(t.req.CollectionName.CollectionName) - if err != nil { - return err - } partitions := make([]string, 0) - partitions = append(partitions, collMeta.PartitionTags...) + for _, collection := range t.mt.collID2Meta { + partitions = append(partitions, collection.PartitionTags...) + } stringListResponse := servicepb.StringListResponse{ Status: &commonpb.Status{ diff --git a/internal/master/partition_task_test.go b/internal/master/partition_task_test.go index 53b698e64a555b579bdaca20d8469a6f324b1d01..cd784f9780658f3e05bd4353bb0e8f98d379bb9d 100644 --- a/internal/master/partition_task_test.go +++ b/internal/master/partition_task_test.go @@ -60,9 +60,6 @@ func TestMaster_Partition(t *testing.T) { K2SChannelNames: []string{"k2s0", "k2s1"}, QueryNodeStatsChannelName: "statistic", MsgChannelSubName: Params.MsgChannelSubName, - - MaxPartitionNum: int64(4096), - DefaultPartitionTag: "_default", } port := 10000 + rand.Intn(1000) @@ -215,7 +212,7 @@ func TestMaster_Partition(t *testing.T) { //assert.Equal(t, collMeta.PartitionTags[0], "partition1") //assert.Equal(t, collMeta.PartitionTags[1], "partition2") - assert.ElementsMatch(t, []string{"_default", "partition1", "partition2"}, collMeta.PartitionTags) + assert.ElementsMatch(t, []string{"default", "partition1", "partition2"}, collMeta.PartitionTags) showPartitionReq := internalpb.ShowPartitionRequest{ MsgType: internalpb.MsgType_kShowPartitions, @@ -227,7 +224,7 @@ func TestMaster_Partition(t *testing.T) { stringList, err := cli.ShowPartitions(ctx, &showPartitionReq) assert.Nil(t, err) - assert.ElementsMatch(t, []string{"_default", "partition1", "partition2"}, stringList.Values) + assert.ElementsMatch(t, []string{"default", "partition1", "partition2"}, stringList.Values) showPartitionReq = internalpb.ShowPartitionRequest{ MsgType: internalpb.MsgType_kShowPartitions, diff --git a/internal/master/segment_manager_test.go b/internal/master/segment_manager_test.go index 85942bdd62e0beda68faeab0d10876db026e731d..7063647be7ed51bee8d9d5d158f79e897c0e1cf9 100644 --- a/internal/master/segment_manager_test.go +++ b/internal/master/segment_manager_test.go @@ -261,9 +261,6 @@ func startupMaster() { K2SChannelNames: []string{"k2s0", "k2s1"}, QueryNodeStatsChannelName: "statistic", MsgChannelSubName: Params.MsgChannelSubName, - - MaxPartitionNum: int64(4096), - DefaultPartitionTag: "_default", } master, err = CreateServer(ctx) diff --git a/internal/proxy/paramtable.go b/internal/proxy/paramtable.go index 791927777dfea0bbdc52b78e126b8faba723c520..c53c84509650cb73f2909f994cdd782b40d41bc4 100644 --- a/internal/proxy/paramtable.go +++ b/internal/proxy/paramtable.go @@ -463,15 +463,3 @@ func (pt *ParamTable) MaxNameLength() int64 { } return maxNameLength } - -func (pt *ParamTable) MaxFieldNum() int64 { - str, err := pt.Load("proxy.maxFieldNum") - if err != nil { - panic(err) - } - maxFieldNum, err := strconv.ParseInt(str, 10, 64) - if err != nil { - panic(err) - } - return maxFieldNum -} diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 1d23ddaafda62e9d70c1b6740b410abc4591848d..65e4e214ee1a4ad9bafcce5392a997ce733a7696 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -4,7 +4,6 @@ import ( "context" "errors" "log" - "strconv" "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/allocator" @@ -165,10 +164,6 @@ func (cct *CreateCollectionTask) SetTs(ts Timestamp) { } func (cct *CreateCollectionTask) PreExecute() error { - if int64(len(cct.schema.Fields)) > Params.MaxFieldNum() { - return errors.New("maximum field's number should be limited to " + strconv.FormatInt(Params.MaxFieldNum(), 10)) - } - // validate collection name if err := ValidateCollectionName(cct.schema.Name); err != nil { return err diff --git a/internal/proxy/validate_util.go b/internal/proxy/validate_util.go index 4f892603f954f9faf8d996ef890f98414615746a..8049595f289ea74a78b1ab39ff7b3dbef241f30b 100644 --- a/internal/proxy/validate_util.go +++ b/internal/proxy/validate_util.go @@ -68,7 +68,7 @@ func ValidatePartitionTag(partitionTag string, strictCheck bool) error { if strictCheck { firstChar := partitionTag[0] - if firstChar != '_' && !isAlpha(firstChar) && !isNumber(firstChar) { + if firstChar != '_' && !isAlpha(firstChar) { msg := invalidMsg + "The first character of a partition tag must be an underscore or letter." return errors.New(msg) }