提交 33e1e170 编写于 作者: B bigsheeper 提交者: yefu.chen

Fix writer client DescribeSegment api

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 0a383608
......@@ -68,6 +68,18 @@ func (kv *EtcdKV) Load(key string) (string, error) {
return string(resp.Kvs[0].Value), nil
}
func (kv *EtcdKV) GetCount(key string) (int64, error) {
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key)
if err != nil {
return -1, err
}
return resp.Count, nil
}
func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) {
ops := make([]clientv3.Op, 0, len(keys))
for _, keyLoad := range keys {
......
......@@ -90,12 +90,12 @@ func (m *MockBuildIndexClient) GetIndexFilePaths(indexID UniqueID) ([]string, er
}
type LoadIndexClient interface {
LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string, indexParams map[string]string) error
LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string) error
}
type MockLoadIndexClient struct {
}
func (m *MockLoadIndexClient) LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string, indexParams map[string]string) error {
func (m *MockLoadIndexClient) LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string) error {
return nil
}
......@@ -133,7 +133,6 @@ func (scheduler *IndexBuildScheduler) describe() error {
fieldID: indexBuildInfo.fieldID,
fieldName: fieldName,
indexFilePaths: filePaths,
indexParams: channelInfo.indexParams,
}
// Save data to meta table
err = scheduler.metaTable.UpdateFieldIndexMeta(&etcdpb.FieldIndexMeta{
......
......@@ -3,15 +3,12 @@ package master
import (
"context"
"log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
)
type IndexLoadInfo struct {
segmentID UniqueID
fieldID UniqueID
fieldName string
indexParams []*commonpb.KeyValuePair
indexFilePaths []string
}
......@@ -39,11 +36,7 @@ func NewIndexLoadScheduler(ctx context.Context, client LoadIndexClient, metaTabl
func (scheduler *IndexLoadScheduler) schedule(info interface{}) error {
indexLoadInfo := info.(*IndexLoadInfo)
indexParams := make(map[string]string)
for _, kv := range indexLoadInfo.indexParams {
indexParams[kv.Key] = kv.Value
}
err := scheduler.client.LoadIndex(indexLoadInfo.indexFilePaths, indexLoadInfo.segmentID, indexLoadInfo.fieldID, indexLoadInfo.fieldName, indexParams)
err := scheduler.client.LoadIndex(indexLoadInfo.indexFilePaths, indexLoadInfo.segmentID, indexLoadInfo.fieldID, indexLoadInfo.fieldName)
//TODO: Save data to meta table
if err != nil {
return err
......
......@@ -68,7 +68,6 @@ func (task *createIndexTask) Execute() error {
fieldID: fieldID,
fieldName: task.req.FieldName,
indexFilePaths: indexMeta.IndexFilePaths,
indexParams: indexMeta.IndexParams,
})
if err != nil {
return err
......
......@@ -10,12 +10,6 @@ import (
"sync/atomic"
"time"
"github.com/zilliztech/milvus-distributed/internal/querynode/client"
indexbuilderclient "github.com/zilliztech/milvus-distributed/internal/indexbuilder/client"
writerclient "github.com/zilliztech/milvus-distributed/internal/writenode/client"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
......@@ -181,15 +175,9 @@ func CreateServer(ctx context.Context) (*Master, error) {
m.scheduler.SetDDMsgStream(pulsarDDStream)
m.scheduler.SetIDAllocator(func() (UniqueID, error) { return m.idAllocator.AllocOne() })
flushClient, err := writerclient.NewWriterClient(Params.EtcdAddress, kvRootPath, Params.WriteNodeSegKvSubPath, pulsarDDStream)
if err != nil {
return nil, err
}
buildIndexClient, err := indexbuilderclient.NewBuildIndexClient(ctx, Params.IndexBuilderAddress)
if err != nil {
return nil, err
}
loadIndexClient := client.NewLoadIndexClient(ctx, Params.PulsarAddress, Params.LoadIndexChannelNames)
flushClient := &MockWriteNodeClient{}
buildIndexClient := &MockBuildIndexClient{}
loadIndexClient := &MockLoadIndexClient{}
m.indexLoadSch = NewIndexLoadScheduler(ctx, loadIndexClient, m.metaTable)
m.indexBuildSch = NewIndexBuildScheduler(ctx, buildIndexClient, m.metaTable, m.indexLoadSch)
......
......@@ -50,8 +50,6 @@ type ParamTable struct {
MaxPartitionNum int64
DefaultPartitionTag string
LoadIndexChannelNames []string
}
var Params ParamTable
......@@ -99,8 +97,6 @@ func (p *ParamTable) Init() {
p.initMsgChannelSubName()
p.initMaxPartitionNum()
p.initDefaultPartitionTag()
p.initLoadIndexChannelNames()
}
func (p *ParamTable) initAddress() {
......@@ -360,11 +356,3 @@ func (p *ParamTable) initDefaultPartitionTag() {
p.DefaultPartitionTag = defaultTag
}
func (p *ParamTable) initLoadIndexChannelNames() {
loadIndexChannelName, err := p.Load("msgChannel.chanNamePrefix.cmd")
if err != nil {
panic(err)
}
p.LoadIndexChannelNames = []string{loadIndexChannelName}
}
......@@ -6,6 +6,7 @@ import (
"github.com/golang/protobuf/proto"
"go.etcd.io/etcd/clientv3"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
......@@ -79,6 +80,21 @@ func (c *Client) DescribeSegment(segmentID UniqueID) (*SegmentDescription, error
}
key := c.kvPrefix + strconv.FormatInt(segmentID, 10)
etcdKV, ok := c.kvClient.(*etcdkv.EtcdKV)
if !ok {
return nil, errors.New("type assertion failed for etcd kv")
}
count, err := etcdKV.GetCount(key)
if err != nil {
return nil, err
}
if count <= 0 {
ret.IsClosed = false
return ret, nil
}
value, err := c.kvClient.Load(key)
if err != nil {
return ret, err
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册