未验证 提交 c8420cfa 编写于 作者: C congqixia 提交者: GitHub

Add proxy healthy check for working services (#5954)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 f0ca2ac7
......@@ -97,6 +97,10 @@ func (node *ProxyNode) ReleaseDQLMessageStream(ctx context.Context, request *pro
zap.Any("db", request.DbID),
zap.Any("collection", request.CollectionID))
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
_ = node.chMgr.removeDQLStream(request.CollectionID)
log.Debug("ReleaseDQLMessageStream Done",
......@@ -111,6 +115,9 @@ func (node *ProxyNode) ReleaseDQLMessageStream(ctx context.Context, request *pro
}
func (node *ProxyNode) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
cct := &CreateCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -157,6 +164,9 @@ func (node *ProxyNode) CreateCollection(ctx context.Context, request *milvuspb.C
}
func (node *ProxyNode) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
dct := &DropCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -202,6 +212,11 @@ func (node *ProxyNode) DropCollection(ctx context.Context, request *milvuspb.Dro
}
func (node *ProxyNode) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
if !node.checkHealthy() {
return &milvuspb.BoolResponse{
Status: unhealthyStatus(),
}, nil
}
hct := &HasCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -249,7 +264,9 @@ func (node *ProxyNode) HasCollection(ctx context.Context, request *milvuspb.HasC
}
func (node *ProxyNode) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
lct := &LoadCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -293,6 +310,9 @@ func (node *ProxyNode) LoadCollection(ctx context.Context, request *milvuspb.Loa
}
func (node *ProxyNode) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
rct := &ReleaseCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -337,6 +357,11 @@ func (node *ProxyNode) ReleaseCollection(ctx context.Context, request *milvuspb.
}
func (node *ProxyNode) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
if !node.checkHealthy() {
return &milvuspb.DescribeCollectionResponse{
Status: unhealthyStatus(),
}, nil
}
dct := &DescribeCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -384,6 +409,11 @@ func (node *ProxyNode) DescribeCollection(ctx context.Context, request *milvuspb
}
func (node *ProxyNode) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
if !node.checkHealthy() {
return &milvuspb.GetCollectionStatisticsResponse{
Status: unhealthyStatus(),
}, nil
}
g := &GetCollectionStatisticsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -431,6 +461,11 @@ func (node *ProxyNode) GetCollectionStatistics(ctx context.Context, request *mil
}
func (node *ProxyNode) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
if !node.checkHealthy() {
return &milvuspb.ShowCollectionsResponse{
Status: unhealthyStatus(),
}, nil
}
sct := &ShowCollectionsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -477,6 +512,9 @@ func (node *ProxyNode) ShowCollections(ctx context.Context, request *milvuspb.Sh
}
func (node *ProxyNode) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
cpt := &CreatePartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -522,6 +560,9 @@ func (node *ProxyNode) CreatePartition(ctx context.Context, request *milvuspb.Cr
}
func (node *ProxyNode) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
dpt := &DropPartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -568,6 +609,11 @@ func (node *ProxyNode) DropPartition(ctx context.Context, request *milvuspb.Drop
}
func (node *ProxyNode) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
if !node.checkHealthy() {
return &milvuspb.BoolResponse{
Status: unhealthyStatus(),
}, nil
}
hpt := &HasPartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -620,6 +666,9 @@ func (node *ProxyNode) HasPartition(ctx context.Context, request *milvuspb.HasPa
}
func (node *ProxyNode) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
lpt := &LoadPartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -665,6 +714,9 @@ func (node *ProxyNode) LoadPartitions(ctx context.Context, request *milvuspb.Loa
}
func (node *ProxyNode) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
rpt := &ReleasePartitionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -710,6 +762,11 @@ func (node *ProxyNode) ReleasePartitions(ctx context.Context, request *milvuspb.
}
func (node *ProxyNode) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
if !node.checkHealthy() {
return &milvuspb.GetPartitionStatisticsResponse{
Status: unhealthyStatus(),
}, nil
}
g := &GetPartitionStatisticsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -759,6 +816,11 @@ func (node *ProxyNode) GetPartitionStatistics(ctx context.Context, request *milv
}
func (node *ProxyNode) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
if !node.checkHealthy() {
return &milvuspb.ShowPartitionsResponse{
Status: unhealthyStatus(),
}, nil
}
spt := &ShowPartitionsTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -807,6 +869,9 @@ func (node *ProxyNode) ShowPartitions(ctx context.Context, request *milvuspb.Sho
}
func (node *ProxyNode) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
cit := &CreateIndexTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -854,6 +919,11 @@ func (node *ProxyNode) CreateIndex(ctx context.Context, request *milvuspb.Create
}
func (node *ProxyNode) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
if !node.checkHealthy() {
return &milvuspb.DescribeIndexResponse{
Status: unhealthyStatus(),
}, nil
}
dit := &DescribeIndexTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -909,6 +979,9 @@ func (node *ProxyNode) DescribeIndex(ctx context.Context, request *milvuspb.Desc
}
func (node *ProxyNode) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
dit := &DropIndexTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -956,6 +1029,11 @@ func (node *ProxyNode) DropIndex(ctx context.Context, request *milvuspb.DropInde
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
func (node *ProxyNode) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
if !node.checkHealthy() {
return &milvuspb.GetIndexBuildProgressResponse{
Status: unhealthyStatus(),
}, nil
}
gibpt := &GetIndexBuildProgressTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -1011,6 +1089,11 @@ func (node *ProxyNode) GetIndexBuildProgress(ctx context.Context, request *milvu
}
func (node *ProxyNode) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
if !node.checkHealthy() {
return &milvuspb.GetIndexStateResponse{
Status: unhealthyStatus(),
}, nil
}
dipt := &GetIndexStateTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -1063,6 +1146,11 @@ func (node *ProxyNode) GetIndexState(ctx context.Context, request *milvuspb.GetI
}
func (node *ProxyNode) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
if !node.checkHealthy() {
return &milvuspb.MutationResult{
Status: unhealthyStatus(),
}, nil
}
it := &InsertTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -1156,6 +1244,11 @@ func (node *ProxyNode) Insert(ctx context.Context, request *milvuspb.InsertReque
}
func (node *ProxyNode) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
if !node.checkHealthy() {
return &milvuspb.SearchResults{
Status: unhealthyStatus(),
}, nil
}
qt := &SearchTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -1229,6 +1322,11 @@ func (node *ProxyNode) Search(ctx context.Context, request *milvuspb.SearchReque
}
func (node *ProxyNode) Retrieve(ctx context.Context, request *milvuspb.RetrieveRequest) (*milvuspb.RetrieveResults, error) {
if !node.checkHealthy() {
return &milvuspb.RetrieveResults{
Status: unhealthyStatus(),
}, nil
}
rt := &RetrieveTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -1288,6 +1386,9 @@ func (node *ProxyNode) Retrieve(ctx context.Context, request *milvuspb.RetrieveR
}
func (node *ProxyNode) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) {
if !node.checkHealthy() {
return unhealthyStatus(), nil
}
ft := &FlushTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -1331,6 +1432,11 @@ func (node *ProxyNode) Flush(ctx context.Context, request *milvuspb.FlushRequest
}
func (node *ProxyNode) Query(ctx context.Context, request *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) {
if !node.checkHealthy() {
return &milvuspb.QueryResults{
Status: unhealthyStatus(),
}, nil
}
schemaPb, err := globalMetaCache.GetCollectionSchema(ctx, request.CollectionName)
if err != nil { // err is not nil if collection not exists
return nil, err
......@@ -1467,6 +1573,10 @@ func (node *ProxyNode) GetPersistentSegmentInfo(ctx context.Context, req *milvus
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
if !node.checkHealthy() {
resp.Status = unhealthyStatus()
return resp, nil
}
segments, err := node.getSegmentsOfCollection(ctx, req.DbName, req.CollectionName)
if err != nil {
resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error()
......@@ -1516,6 +1626,10 @@ func (node *ProxyNode) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.Ge
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
if !node.checkHealthy() {
resp.Status = unhealthyStatus()
return resp, nil
}
segments, err := node.getSegmentsOfCollection(ctx, req.DbName, req.CollectionName)
if err != nil {
resp.Status.Reason = err.Error()
......@@ -1689,3 +1803,16 @@ func (node *ProxyNode) RegisterLink(ctx context.Context, req *milvuspb.RegisterL
},
}, nil
}
// checkHealthy checks proxy node state is Healthy
func (node *ProxyNode) checkHealthy() bool {
code := node.stateCode.Load().(internalpb.StateCode)
return code == internalpb.StateCode_Healthy
}
func unhealthyStatus() *commonpb.Status {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "proxy node not healthy",
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册