提交 a03ab05c 编写于 作者: S sunby 提交者: yefu.chen

Add DescribeIndex and DescribeIndexProgress RPC

Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
上级 84a0cf70
......@@ -2,6 +2,7 @@ package master
import (
"context"
"fmt"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
......@@ -444,6 +445,9 @@ func (s *Master) AssignSegmentID(ctx context.Context, request *internalpb.Assign
}
func (s *Master) CreateIndex(ctx context.Context, req *internalpb.CreateIndexRequest) (*commonpb.Status, error) {
ret := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
}
task := &createIndexTask{
baseTask: baseTask{
sch: s.scheduler,
......@@ -458,30 +462,78 @@ func (s *Master) CreateIndex(ctx context.Context, req *internalpb.CreateIndexReq
err := s.scheduler.Enqueue(task)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Enqueue failed: " + err.Error(),
}, nil
ret.Reason = "Enqueue failed: " + err.Error()
return ret, nil
}
err = task.WaitToFinish(ctx)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Create Index error: " + err.Error(),
}, nil
ret.Reason = "Create Index error: " + err.Error()
return ret, nil
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
}, nil
ret.ErrorCode = commonpb.ErrorCode_SUCCESS
return ret, nil
}
func (s *Master) DescribeIndex(context.Context, *internalpb.DescribeIndexRequest) (*servicepb.DescribeIndexResponse, error) {
return nil, nil
func (s *Master) DescribeIndex(ctx context.Context, req *internalpb.DescribeIndexRequest) (*servicepb.DescribeIndexResponse, error) {
resp := &servicepb.DescribeIndexResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
CollectionName: req.CollectionName,
FieldName: req.FieldName,
}
task := &describeIndexTask{
baseTask: baseTask{
sch: s.scheduler,
mt: s.metaTable,
cv: make(chan error),
},
req: req,
resp: resp,
}
if err := s.scheduler.Enqueue(task); err != nil {
resp.Status.Reason = fmt.Sprintf("Enqueue failed: %s", err.Error())
return resp, nil
}
if err := task.WaitToFinish(ctx); err != nil {
resp.Status.Reason = fmt.Sprintf("Describe Index failed: %s", err.Error())
return resp, nil
}
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
return resp, nil
}
func (s *Master) DescribeIndexProgress(context.Context, *internalpb.DescribeIndexProgressRequest) (*servicepb.BoolResponse, error) {
return nil, nil
func (s *Master) DescribeIndexProgress(ctx context.Context, req *internalpb.DescribeIndexProgressRequest) (*servicepb.BoolResponse, error) {
resp := &servicepb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
Value: false,
}
task := &describeIndexProgressTask{
baseTask: baseTask{
sch: s.scheduler,
mt: s.metaTable,
cv: make(chan error),
},
req: req,
resp: resp,
}
if err := s.scheduler.Enqueue(task); err != nil {
resp.Status.Reason = "Enqueue failed :" + err.Error()
return resp, nil
}
if err := task.WaitToFinish(ctx); err != nil {
resp.Status.Reason = "Describe index progress failed:" + err.Error()
return resp, nil
}
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
return resp, nil
}
......@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
)
type createIndexTask struct {
......@@ -93,3 +94,89 @@ func (task *createIndexTask) Execute() error {
// close unfilled segment
return task.segManager.ForceClose(collMeta.ID)
}
type describeIndexTask struct {
baseTask
req *internalpb.DescribeIndexRequest
resp *servicepb.DescribeIndexResponse
}
func (task *describeIndexTask) Type() internalpb.MsgType {
return internalpb.MsgType_kDescribeIndex
}
func (task *describeIndexTask) Ts() (Timestamp, error) {
return task.req.Timestamp, nil
}
func (task *describeIndexTask) Execute() error {
collMeta, err := task.mt.GetCollectionByName(task.req.CollectionName)
if err != nil {
return err
}
var fieldID int64 = -1
for _, fieldSchema := range collMeta.Schema.Fields {
if fieldSchema.Name == task.req.FieldName {
fieldID = fieldSchema.FieldID
break
}
}
if fieldID == -1 {
return fmt.Errorf("can not find field %s", task.req.FieldName)
}
indexParams, err := task.mt.GetFieldIndexParams(collMeta.ID, fieldID)
if err != nil {
return err
}
task.resp.ExtraParams = indexParams
return nil
}
type describeIndexProgressTask struct {
baseTask
req *internalpb.DescribeIndexProgressRequest
runtimeStats *RuntimeStats
resp *servicepb.BoolResponse
}
func (task *describeIndexProgressTask) Type() internalpb.MsgType {
return internalpb.MsgType_kDescribeIndexProgress
}
func (task *describeIndexProgressTask) Ts() (Timestamp, error) {
return task.req.Timestamp, nil
}
func (task *describeIndexProgressTask) Execute() error {
// get field id, collection id
collMeta, err := task.mt.GetCollectionByName(task.req.CollectionName)
if err != nil {
return err
}
var fieldID int64 = -1
for _, fieldSchema := range collMeta.Schema.Fields {
if fieldSchema.Name == task.req.FieldName {
fieldID = fieldSchema.FieldID
break
}
}
if fieldID == -1 {
return fmt.Errorf("can not find field %s", task.req.FieldName)
}
// total segment nums
totalSegmentNums := len(collMeta.SegmentIDs)
// get completed segment nums from querynode's runtime stats
relatedSegments := task.runtimeStats.GetTotalNumOfRelatedSegments(collMeta.ID, fieldID, task.req.ExtraParams)
if int64(totalSegmentNums) == relatedSegments {
task.resp.Value = true
} else {
task.resp.Value = false
}
return nil
}
package master
import (
"sync"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type RuntimeStats struct {
collStats map[UniqueID]*CollRuntimeStats // collection id to array of field statistic
mu sync.RWMutex
}
func (rs *RuntimeStats) UpdateFieldStat(collID UniqueID, fieldID UniqueID, stats *FieldRuntimeStats) error {
func (rs *RuntimeStats) UpdateFieldStat(collID UniqueID, fieldID UniqueID, stats *FieldIndexRuntimeStats) error {
rs.mu.Lock()
defer rs.mu.Unlock()
peerID := stats.peerID
_, ok := rs.collStats[collID]
if !ok {
rs.collStats[collID] = &CollRuntimeStats{
fieldStats: make(map[UniqueID][]*FieldRuntimeStats),
fieldIndexStats: make(map[UniqueID][]*FieldIndexRuntimeStats),
}
}
collRuntimeStats := rs.collStats[collID]
fieldStats := collRuntimeStats.fieldStats[fieldID]
fieldStats := collRuntimeStats.fieldIndexStats[fieldID]
for i, v := range fieldStats {
if v.peerID == peerID && typeutil.CompareIndexParams(v.indexParams, stats.indexParams) {
fieldStats[i] = stats
......@@ -27,15 +33,39 @@ func (rs *RuntimeStats) UpdateFieldStat(collID UniqueID, fieldID UniqueID, stats
}
}
collRuntimeStats.fieldStats[fieldID] = append(collRuntimeStats.fieldStats[fieldID], stats)
collRuntimeStats.fieldIndexStats[fieldID] = append(collRuntimeStats.fieldIndexStats[fieldID], stats)
return nil
}
func (rs *RuntimeStats) GetTotalNumOfRelatedSegments(collID UniqueID, fieldID UniqueID, indexParams []*commonpb.KeyValuePair) int64 {
rs.mu.RLock()
defer rs.mu.RUnlock()
collStats, ok := rs.collStats[collID]
if !ok {
return 0
}
fieldStats, ok := collStats.fieldIndexStats[fieldID]
if !ok {
return 0
}
var total int64 = 0
for _, stat := range fieldStats {
if typeutil.CompareIndexParams(stat.indexParams, indexParams) {
total += stat.numOfRelatedSegments
}
}
return total
}
type CollRuntimeStats struct {
fieldStats map[UniqueID][]*FieldRuntimeStats
fieldIndexStats map[UniqueID][]*FieldIndexRuntimeStats
}
type FieldRuntimeStats struct {
type FieldIndexRuntimeStats struct {
peerID int64
indexParams []*commonpb.KeyValuePair
numOfRelatedSegments int64
......
......@@ -22,13 +22,13 @@ func TestRuntimeStats_UpdateFieldStats(t *testing.T) {
{1, 1, 2, 100},
}
for _, testcase := range cases {
err := runtimeStats.UpdateFieldStat(testcase.collID, testcase.fieldID, &FieldRuntimeStats{
err := runtimeStats.UpdateFieldStat(testcase.collID, testcase.fieldID, &FieldIndexRuntimeStats{
peerID: testcase.peerID,
indexParams: []*commonpb.KeyValuePair{},
numOfRelatedSegments: testcase.nums,
})
assert.Nil(t, err)
statsArray := runtimeStats.collStats[testcase.collID].fieldStats[testcase.fieldID]
statsArray := runtimeStats.collStats[testcase.collID].fieldIndexStats[testcase.fieldID]
assert.NotEmpty(t, statsArray)
found := 0
......
......@@ -8,7 +8,7 @@ import (
type StatsProcessor struct {
metaTable *metaTable
runTimeStats *RuntimeStats
runtimeStats *RuntimeStats
segmentThreshold float64
segmentThresholdFactor float64
......@@ -61,13 +61,13 @@ func (processor *StatsProcessor) processFieldStat(peerID int64, fieldStats *inte
fieldID := fieldStats.FieldID
for _, stat := range fieldStats.IndexStats {
fieldStats := &FieldRuntimeStats{
fieldStats := &FieldIndexRuntimeStats{
peerID: peerID,
indexParams: stat.IndexParams,
numOfRelatedSegments: stat.NumRelatedSegments,
}
if err := processor.runTimeStats.UpdateFieldStat(collID, fieldID, fieldStats); err != nil {
if err := processor.runtimeStats.UpdateFieldStat(collID, fieldID, fieldStats); err != nil {
return err
}
}
......@@ -77,7 +77,7 @@ func (processor *StatsProcessor) processFieldStat(peerID int64, fieldStats *inte
func NewStatsProcessor(mt *metaTable, runTimeStats *RuntimeStats, globalTSOAllocator func() (Timestamp, error)) *StatsProcessor {
return &StatsProcessor{
metaTable: mt,
runTimeStats: runTimeStats,
runtimeStats: runTimeStats,
segmentThreshold: Params.SegmentSize * 1024 * 1024,
segmentThresholdFactor: Params.SegmentSizeFactor,
globalTSOAllocator: globalTSOAllocator,
......
......@@ -110,7 +110,7 @@ func TestStatsProcess(t *testing.T) {
assert.Equal(t, int64(2500000), segMeta.MemSize)
assert.Equal(t, int64(25000), segMeta.NumRows)
assert.EqualValues(t, 100, runtimeStats.collStats[1].fieldStats[100][0].numOfRelatedSegments)
assert.EqualValues(t, 200, runtimeStats.collStats[2].fieldStats[100][0].numOfRelatedSegments)
assert.EqualValues(t, 100, runtimeStats.collStats[1].fieldIndexStats[100][0].numOfRelatedSegments)
assert.EqualValues(t, 200, runtimeStats.collStats[2].fieldIndexStats[100][0].numOfRelatedSegments)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册