提交 7342e075 编写于 作者: B bigsheeper 提交者: yefu.chen

Fix index and flush errors, and fix master crash error

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 2c9e2267
......@@ -103,6 +103,9 @@ func NewMasterService(ctx context.Context) (*MasterService, error) {
is.Params.Init()
log.Printf("index service address : %s", is.Params.Address)
indexService := isc.NewClient(is.Params.Address)
if err = indexService.Init(); err != nil {
return nil, err
}
if err = svr.SetIndexService(indexService); err != nil {
return nil, err
......
......@@ -456,6 +456,11 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
continue
}
}
err := ibNode.completeFlush(currentSegID)
if err != nil {
log.Println(err)
}
log.Println("Flush completed")
}
}
......
......@@ -2,6 +2,7 @@ package indexservice
import (
"context"
"fmt"
"log"
"sync"
"time"
......@@ -177,6 +178,7 @@ func (i *ServiceImpl) GetStatisticsChannel() (string, error) {
}
func (i *ServiceImpl) BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
fmt.Println("builder building index ..., indexName = ", req.IndexName, "indexID = ", req.IndexID, "dataPath = ", req.DataPaths)
ret := &indexpb.BuildIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
......@@ -244,10 +246,13 @@ func (i *ServiceImpl) GetIndexStates(req *indexpb.IndexStatesRequest) (*indexpb.
}
func (i *ServiceImpl) GetIndexFilePaths(req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
var indexPaths []*indexpb.IndexFilePathInfo
var indexPaths []*indexpb.IndexFilePathInfo = nil
for _, indexID := range req.IndexBuildIDs {
indexPathInfo, _ := i.metaTable.GetIndexFilePathInfo(indexID)
indexPathInfo, err := i.metaTable.GetIndexFilePathInfo(indexID)
if err != nil {
return nil, err
}
indexPaths = append(indexPaths, indexPathInfo)
}
......
......@@ -30,6 +30,10 @@ func (i *ServiceImpl) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest)
if err != nil {
return err
}
err = nodeClient.Init()
if err != nil {
return err
}
item := &PQItem{
value: nodeClient,
key: nodeID,
......
package masterservice
import (
"fmt"
"log"
"github.com/golang/protobuf/proto"
......@@ -597,6 +598,7 @@ func (t *CreateIndexReqTask) Execute() error {
indexParams: t.Req.ExtraParams,
}
t.core.indexTaskQueue <- &task
fmt.Println("create index task enqueue, segID = ", seg)
}
return nil
}
......
......@@ -323,7 +323,6 @@ func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error
}
indexFilePathRequest := &indexpb.IndexFilePathsRequest{
// TODO: rename indexIDs to buildIDs
IndexBuildIDs: []UniqueID{indexBuildID},
}
pathResponse, err := loader.indexClient.GetIndexFilePaths(indexFilePathRequest)
......
......@@ -10,7 +10,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/msgstream"
)
const indexCheckInterval = 1
const indexCheckInterval = 3
type loadService struct {
ctx context.Context
......@@ -115,6 +115,7 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni
return err
}
if errIndex == nil {
fmt.Println("loading index...")
indexPaths, err := s.segLoader.indexLoader.getIndexPaths(buildID)
if err != nil {
return err
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册