提交 10217f94 编写于 作者: N neza2017 提交者: yefu.chen

Add test for etcd kv

Signed-off-by: Nneza2017 <yefu.chen@zilliz.com>
上级 54296144
// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
......
......@@ -515,7 +515,7 @@ type KVBase interface {
MultiRemove(keys []string)
Watch(key string) clientv3.WatchChan
WatchWithPrefix(key string) clientv3.WatchChan
LoadWithPrefix(key string) ( []string, []string)
LoadWithPrefix(key string) ( []string, []string, error)
}
```
......@@ -1076,7 +1076,7 @@ Note that *tenantId*, *proxyId*, *collectionId*, *segmentId* are unique strings
```go
type metaTable struct {
kv *kv.EtcdKV // client of a reliable kv service, i.e. etcd client
kv kv.Base // client of a reliable kv service, i.e. etcd client
tenantId2Meta map[int64]TenantMeta // tenant id to tenant meta
proxyId2Meta map[int64]ProxyMeta // proxy id to proxy meta
collId2Meta map[int64]CollectionMeta // collection id to collection meta
......@@ -1104,10 +1104,11 @@ func (meta *metaTable) HasPartition(collId int64, tag string) bool
func (meta *metaTable) DeletePartition(collId int64, tag string) error
func (meta *metaTable) AddSegment(seg *SegmentMeta) error
func (meta *metaTable) GetSegmentById(segId int64)(*SegmentMeta, error)
func (meta *metaTable) DeleteSegment(segId int64) error
func (meta *metaTable) CloseSegment(segId int64, closeTs Timestamp, num_rows int64) error
func NewMetaTable(kv *kv.EtcdKV) *metaTable
func NewMetaTable(kv kv.Base) (*metaTable,error)
```
*metaTable* maintains meta both in memory and *etcdKV*. It keeps meta's consistency in both sides. All its member functions may be called concurrently.
......
......@@ -3,40 +3,24 @@ module github.com/zilliztech/milvus-distributed
go 1.15
require (
cloud.google.com/go/bigquery v1.4.0 // indirect
code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48
github.com/99designs/keyring v1.1.5 // indirect
github.com/BurntSushi/toml v0.3.1
github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48 // indirect
github.com/apache/pulsar-client-go v0.1.1
github.com/apache/pulsar/pulsar-client-go v0.0.0-20200901051823-800681aaa9af
github.com/aws/aws-sdk-go v1.30.8
github.com/coreos/etcd v3.3.25+incompatible // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f
github.com/danieljoos/wincred v1.1.0 // indirect
github.com/docker/go-units v0.4.0
github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b // indirect
github.com/frankban/quicktest v1.10.2 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-kit/kit v0.10.0 // indirect
github.com/gogo/protobuf v1.3.1
github.com/golang/mock v1.4.4 // indirect
github.com/golang/protobuf v1.4.2
github.com/google/btree v1.0.0
github.com/google/martian/v3 v3.0.0 // indirect
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99 // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.3.2
github.com/json-iterator/go v1.1.10
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/keybase/go-keychain v0.0.0-20200502122510-cda31fe0c86d // indirect
github.com/klauspost/compress v1.10.11 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/minio/minio-go/v7 v7.0.5
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/onsi/ginkgo v1.12.1 // indirect
github.com/onsi/gomega v1.10.0 // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.4 // indirect
......@@ -45,32 +29,28 @@ require (
github.com/prometheus/client_golang v1.5.1 // indirect
github.com/prometheus/common v0.10.0 // indirect
github.com/prometheus/procfs v0.1.3 // indirect
github.com/rs/xid v1.2.1
github.com/sirupsen/logrus v1.6.0
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.6.1
github.com/tikv/client-go v0.0.0-20200824032810-95774393107b
github.com/tikv/pd v2.1.19+incompatible
github.com/yahoo/athenz v1.9.16 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.opencensus.io v0.22.4 // indirect
go.uber.org/atomic v1.6.0
go.uber.org/zap v1.15.0
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20200904194848-62affa334b73 // indirect
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/api v0.22.0 // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150 // indirect
google.golang.org/grpc v1.31.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v2 v2.3.0
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
k8s.io/utils v0.0.0-20200912215256-4140de9c8800 // indirect
rsc.io/quote/v3 v3.1.0 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)
......
此差异已折叠。
......@@ -2,23 +2,15 @@ package kv
import (
"context"
"path"
"time"
"github.com/pingcap/log"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/util/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"log"
"path"
"time"
)
const (
requestTimeout = 10 * time.Second
slowRequestTime = 1 * time.Second
)
var (
errTxnFailed = errors.New("failed to commit transaction")
requestTimeout = 10 * time.Second
)
type EtcdKV struct {
......@@ -38,72 +30,49 @@ func (kv *EtcdKV) Close() {
kv.client.Close()
}
func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string) {
func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
key = path.Join(kv.rootPath, key)
println("in loadWithPrefix,", key)
resp, err := etcdutil.EtcdKVGet(kv.client, key, clientv3.WithPrefix())
log.Printf("LoadWithPrefix %s", key)
ctx, _ := context.WithTimeout(context.TODO(), requestTimeout)
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return []string{}, []string{}
return nil, nil, err
}
var keys []string
var values []string
for _, kvs := range resp.Kvs {
//println(len(kvs.))
if len(kvs.Key) <= 0 {
println("KKK")
continue
}
keys = append(keys, string(kvs.Key))
values = append(values, string(kvs.Value))
keys := make([]string, 0, resp.Count)
values := make([]string, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, string(kv.Value))
}
//println(keys)
//println(values)
return keys, values
return keys, values, nil
}
func (kv *EtcdKV) Load(key string) (string, error) {
key = path.Join(kv.rootPath, key)
resp, err := etcdutil.EtcdKVGet(kv.client, key)
ctx, _ := context.WithTimeout(context.TODO(), requestTimeout)
resp, err := kv.client.Get(ctx, key)
if err != nil {
return "", err
}
if n := len(resp.Kvs); n == 0 {
return "", nil
} else if n > 1 {
return "", errors.Errorf("load more than one kvs: key %v kvs %v", key, n)
if resp.Count <= 0 {
return "", errors.Errorf("there is no value on key = %s", key)
}
return string(resp.Kvs[0].Value), nil
}
func (kv *EtcdKV) Save(key, value string) error {
key = path.Join(kv.rootPath, key)
txn := NewSlowLogTxn(kv.client)
resp, err := txn.Then(clientv3.OpPut(key, value)).Commit()
if err != nil {
log.Error("save to etcd meet error", zap.String("key", key), zap.String("value", value))
return errors.WithStack(err)
}
if !resp.Succeeded {
return errors.WithStack(errTxnFailed)
}
return nil
ctx, _ := context.WithTimeout(context.TODO(), requestTimeout)
_, err := kv.client.Put(ctx, key, value)
return err
}
func (kv *EtcdKV) Remove(key string) error {
key = path.Join(kv.rootPath, key)
txn := NewSlowLogTxn(kv.client)
resp, err := txn.Then(clientv3.OpDelete(key)).Commit()
if err != nil {
log.Error("remove from etcd meet error", zap.String("key", key))
return errors.WithStack(err)
}
if !resp.Succeeded {
return errors.WithStack(errTxnFailed)
}
return nil
ctx, _ := context.WithTimeout(context.TODO(), requestTimeout)
_, err := kv.client.Delete(ctx, key)
return err
}
func (kv *EtcdKV) Watch(key string) clientv3.WatchChan {
......@@ -117,59 +86,3 @@ func (kv *EtcdKV) WatchWithPrefix(key string) clientv3.WatchChan {
rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix())
return rch
}
// SlowLogTxn wraps etcd transaction and log slow one.
type SlowLogTxn struct {
clientv3.Txn
cancel context.CancelFunc
}
// NewSlowLogTxn create a SlowLogTxn.
func NewSlowLogTxn(client *clientv3.Client) clientv3.Txn {
ctx, cancel := context.WithTimeout(client.Ctx(), requestTimeout)
return &SlowLogTxn{
Txn: client.Txn(ctx),
cancel: cancel,
}
}
// If takes a list of comparison. If all comparisons passed in succeed,
// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
func (t *SlowLogTxn) If(cs ...clientv3.Cmp) clientv3.Txn {
return &SlowLogTxn{
Txn: t.Txn.If(cs...),
cancel: t.cancel,
}
}
// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
func (t *SlowLogTxn) Then(ops ...clientv3.Op) clientv3.Txn {
return &SlowLogTxn{
Txn: t.Txn.Then(ops...),
cancel: t.cancel,
}
}
// Commit implements Txn Commit interface.
func (t *SlowLogTxn) Commit() (*clientv3.TxnResponse, error) {
start := time.Now()
resp, err := t.Txn.Commit()
t.cancel()
cost := time.Since(start)
if cost > slowRequestTime {
log.Warn("txn runs too slow",
zap.Error(err),
zap.Reflect("response", resp),
zap.Duration("cost", cost))
}
//label := "success"
//if err != nil {
// label = "failed"
//}
//txnCounter.WithLabelValues(label).Inc()
//txnDuration.WithLabelValues(label).Observe(cost.Seconds())
return resp, errors.WithStack(err)
}
package kv
import (
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/clientv3"
"path"
"testing"
)
func TestEtcdKV_Load(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}})
assert.Nil(t, err)
kv := NewEtcdKV(cli, "/etcd/test/root")
defer kv.Close()
err = kv.Save("abc", "123")
assert.Nil(t, err)
err = kv.Save("abcd", "1234")
assert.Nil(t, err)
val, err := kv.Load("abc")
assert.Nil(t, err)
assert.Equal(t, val, "123")
keys, vals, err := kv.LoadWithPrefix("abc")
assert.Nil(t, err)
assert.Equal(t, len(keys), len(vals))
assert.Equal(t, len(keys), 2)
assert.Equal(t, keys[0], path.Join(kv.rootPath, "abc"))
assert.Equal(t, keys[1], path.Join(kv.rootPath, "abcd"))
assert.Equal(t, vals[0], "123")
assert.Equal(t, vals[1], "1234")
}
......@@ -8,6 +8,6 @@ type Base interface {
Remove(key string) error
Watch(key string) clientv3.WatchChan
WatchWithPrefix(key string) clientv3.WatchChan
LoadWithPrefix(key string) ( []string, []string)
LoadWithPrefix(key string) ([]string, []string, error)
Close()
}
......@@ -98,7 +98,7 @@ func (t *createCollectionTask) Execute() error {
return errors.New("save collection failed")
}
t.mt.collMeta[collectionId] = collection
t.mt.collId2Meta[collectionId] = collection
_ = t.Notify()
return nil
......@@ -141,7 +141,7 @@ func (t *dropCollectionTask) Execute() error {
return errors.New("save collection failed")
}
delete(t.mt.collMeta, collectionId)
delete(t.mt.collId2Meta, collectionId)
_ = t.Notify()
return nil
......@@ -244,7 +244,7 @@ func (t *showCollectionsTask) Execute() error {
}
collections := make([]string, 0)
for _, collection := range t.mt.collMeta {
for _, collection := range t.mt.collId2Meta {
collections = append(collections, collection.Schema.Name)
}
......
// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package master
import (
......@@ -19,9 +6,9 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/master/controller"
"github.com/zilliztech/milvus-distributed/internal/master/informer"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"google.golang.org/grpc"
......@@ -200,7 +187,7 @@ func (s *Master) grpcLoop() {
defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10)
lis, err := net.Listen("tcp", defaultGRPCPort)
if err != nil {
log.Println("failed to listen: %v", err)
log.Printf("failed to listen: %v", err)
return
}
......@@ -247,7 +234,7 @@ func (s *Master) pulsarLoop() {
case msg := <-consumerChan:
var m internalpb.SegmentStatistics
proto.Unmarshal(msg.Payload(), &m)
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
fmt.Printf("Received message msgId: %#v -- content: '%d'\n",
msg.ID(), m.SegmentId)
s.ssChan <- m
consumer.Ack(msg)
......
package master
import (
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"strconv"
"sync"
)
type metaTable struct {
client kv.Base // client of a reliable kv service, i.e. etcd client
rootPath string // this metaTable's working root path on the reliable kv service
tenantMeta map[int64]etcdpb.TenantMeta // tenant id to tenant meta
proxyMeta map[int64]etcdpb.ProxyMeta // proxy id to proxy meta
collMeta map[int64]etcdpb.CollectionMeta // collection id to collection meta
segMeta map[int64]etcdpb.SegmentMeta // segment id to segment meta
client kv.Base // client of a reliable kv service, i.e. etcd client
tenantId2Meta map[int64]pb.TenantMeta // tenant id to tenant meta
proxyId2Meta map[int64]pb.ProxyMeta // proxy id to proxy meta
collId2Meta map[int64]pb.CollectionMeta // collection id to collection meta
collName2Id map[string]int64 // collection name to collection id
segId2Meta map[int64]pb.SegmentMeta // segment id to segment meta
tenantLock sync.RWMutex
proxyLock sync.RWMutex
ddLock sync.RWMutex
}
func (mt *metaTable) GetCollectionByName(collectionName string) (*etcdpb.CollectionMeta, error) {
for _, v := range mt.collMeta {
if v.Schema.Name == collectionName {
return &v, nil
}
//todo, load meta from etcd
func NewMetaTable(kv kv.Base) (*metaTable, error) {
return &metaTable{
client: kv,
tenantId2Meta: make(map[int64]pb.TenantMeta),
proxyId2Meta: make(map[int64]pb.ProxyMeta),
collId2Meta: make(map[int64]pb.CollectionMeta),
collName2Id: make(map[string]int64),
segId2Meta: make(map[int64]pb.SegmentMeta),
tenantLock: sync.RWMutex{},
proxyLock: sync.RWMutex{},
ddLock: sync.RWMutex{},
}, nil
}
// mt.ddLock.Lock() before call this function
func (mt *metaTable) saveCollectionMeta(coll *pb.CollectionMeta) error {
coll_bytes, err := proto.Marshal(coll)
if err != nil {
return err
}
err = mt.client.Save(strconv.FormatInt(coll.Id, 10), string(coll_bytes))
if err != nil {
return err
}
mt.collId2Meta[coll.Id] = *coll
mt.collName2Id[coll.Schema.Name] = coll.Id
return nil
}
// mt.ddLock.Lock() before call this function
func (mt *metaTable) saveSegmentMeta(seg *pb.SegmentMeta) error {
seg_bytes, err := proto.Marshal(seg)
if err != nil {
return err
}
err = mt.client.Save(strconv.FormatInt(seg.SegmentId, 10), string(seg_bytes))
if err != nil {
return err
}
mt.segId2Meta[seg.SegmentId] = *seg
return nil
}
func (mt *metaTable) AddCollection(coll *pb.CollectionMeta) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
return mt.saveCollectionMeta(coll)
}
func (mt *metaTable) GetCollectionByName(collectionName string) (*pb.CollectionMeta, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
return nil, errors.New("Cannot found collection: " + collectionName)
vid, ok := mt.collName2Id[collectionName]
if !ok {
return nil, errors.Errorf("can't find collection: " + collectionName)
}
col, ok := mt.collId2Meta[vid]
if !ok {
return nil, errors.Errorf("can't find collection: " + collectionName)
}
return &col, nil
}
func (mt *metaTable) HasPartition(partitionTag, collectionName string) bool {
var hasPartition = false
for _, v := range mt.collMeta {
if v.Schema.Name == collectionName {
for _, tag := range v.PartitionTags {
if tag == partitionTag {
hasPartition = true
}
}
col_meta, err := mt.GetCollectionByName(collectionName)
if err != nil {
return false
}
for _, tag := range col_meta.PartitionTags {
if tag == partitionTag {
return true
}
}
return hasPartition
return false
}
func (mt *metaTable) DeletePartition(partitionTag, collectionName string) error {
var tmpPartitionTags = make([]string, 0)
var hasPartition = false
for _, v := range mt.collMeta {
if v.Schema.Name == collectionName {
for _, tag := range v.PartitionTags {
if tag == partitionTag {
hasPartition = true
} else {
tmpPartitionTags = append(tmpPartitionTags, tag)
}
}
if !hasPartition {
return errors.New("Cannot found partition: " + partitionTag + " in collection: " + collectionName)
} else {
v.PartitionTags = tmpPartitionTags
return nil
}
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
col_id, ok := mt.collName2Id[collectionName]
if !ok {
return errors.Errorf("can't find collection %s", collectionName)
}
col_meta, ok := mt.collId2Meta[col_id]
if !ok {
return errors.Errorf("can't find collection %s", collectionName)
}
pt := make([]string, 0, len(col_meta.PartitionTags))
for _, t := range col_meta.PartitionTags {
if t != partitionTag {
pt = append(pt, t)
}
}
if len(pt) == len(col_meta.PartitionTags) {
return nil
}
seg := make([]int64, 0, len(col_meta.SegmentIds))
for _, s := range col_meta.SegmentIds {
sm, ok := mt.segId2Meta[s]
if !ok {
return errors.Errorf("can't find segment id = %d", s)
}
if sm.PartitionTag != partitionTag {
seg = append(seg, s)
}
}
col_meta.PartitionTags = pt
col_meta.SegmentIds = seg
return mt.saveCollectionMeta(&col_meta)
}
return errors.New("Cannot found collection: " + collectionName)
func (mt *metaTable) AddSegment(seg *pb.SegmentMeta) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
return mt.saveSegmentMeta(seg)
}
func (mt *metaTable) GetSegmentById(segId int64) (*pb.SegmentMeta, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
sm, ok := mt.segId2Meta[segId]
if !ok {
return nil, errors.Errorf("can't find segment id = %d", segId)
}
return &sm, nil
}
package master
import (
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/kv"
pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"go.etcd.io/etcd/clientv3"
"testing"
)
func TestMetaTable_DeletePartition(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}})
assert.Nil(t, err)
etcd_kv := kv.NewEtcdKV(cli, "/etcd/test/root")
meta, err := NewMetaTable(etcd_kv)
assert.Nil(t, err)
defer meta.client.Close()
col_meta := pb.CollectionMeta{
Id: 100,
Schema: &schemapb.CollectionSchema{
Name: "coll1",
},
CreateTime: 0,
SegmentIds: []int64{200},
PartitionTags: []string{"p1"},
}
seg_id := pb.SegmentMeta{
SegmentId: 200,
CollectionId: 100,
PartitionTag: "p1",
}
err = meta.AddCollection(&col_meta)
assert.Nil(t, err)
err = meta.AddSegment(&seg_id)
assert.Nil(t, err)
err = meta.DeletePartition("p1", "coll1")
assert.Nil(t, err)
}
......@@ -231,7 +231,7 @@ func (t *showPartitionTask) Execute() error {
}
partitions := make([]string, 0)
for _, collection := range t.mt.collMeta {
for _, collection := range t.mt.collId2Meta {
for _, partition := range collection.PartitionTags {
partitions = append(partitions, partition)
}
......
......@@ -11,8 +11,8 @@ import (
"time"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/master/collection"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/master/collection"
"github.com/zilliztech/milvus-distributed/internal/master/segment"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
......@@ -263,7 +263,10 @@ func (node *QueryNode) processResp(resp clientv3.WatchResponse) error {
}
func (node *QueryNode) loadCollections() error {
keys, values := node.kvBase.LoadWithPrefix(CollectionPrefix)
keys, values, err := node.kvBase.LoadWithPrefix(CollectionPrefix)
if err != nil {
return err
}
for i := range keys {
objID := GetCollectionObjId(keys[i])
node.processCollectionCreate(objID, values[i])
......@@ -271,7 +274,10 @@ func (node *QueryNode) loadCollections() error {
return nil
}
func (node *QueryNode) loadSegments() error {
keys, values := node.kvBase.LoadWithPrefix(SegmentPrefix)
keys, values, err := node.kvBase.LoadWithPrefix(SegmentPrefix)
if err != nil {
return err
}
for i := range keys {
objID := GetSegmentObjId(keys[i])
node.processSegmentCreate(objID, values[i])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册