未验证 提交 f2a27e0e 编写于 作者: L Letian Jiang 提交者: GitHub

Retry GetShardLeaders until service available or timeout (#17183)

Signed-off-by: NLetian Jiang <letian.jiang@zilliz.com>
上级 5872c5af
......@@ -326,7 +326,7 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"eader\022*\n\004base\030\001 \001(\0132\034.milvus.proto.commo"
"n.MsgBase\"M\n\014DMLMsgHeader\022*\n\004base\030\001 \001(\0132"
"\034.milvus.proto.common.MsgBase\022\021\n\tshardNa"
"me\030\002 \001(\t*\222\006\n\tErrorCode\022\013\n\007Success\020\000\022\023\n\017U"
"me\030\002 \001(\t*\252\006\n\tErrorCode\022\013\n\007Success\020\000\022\023\n\017U"
"nexpectedError\020\001\022\021\n\rConnectFailed\020\002\022\024\n\020P"
"ermissionDenied\020\003\022\027\n\023CollectionNotExists"
"\020\004\022\023\n\017IllegalArgument\020\005\022\024\n\020IllegalDimens"
......@@ -345,61 +345,61 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"ilure\020\035\022\033\n\027UpdateCredentialFailure\020\036\022\033\n\027"
"DeleteCredentialFailure\020\037\022\030\n\024GetCredenti"
"alFailure\020 \022\030\n\024ListCredUsersFailure\020!\022\022\n"
"\016NotShardLeader\020\"\022\022\n\rDDRequestRace\020\350\007*X\n"
"\nIndexState\022\022\n\016IndexStateNone\020\000\022\014\n\010Uniss"
"ued\020\001\022\016\n\nInProgress\020\002\022\014\n\010Finished\020\003\022\n\n\006F"
"ailed\020\004*\202\001\n\014SegmentState\022\024\n\020SegmentState"
"None\020\000\022\014\n\010NotExist\020\001\022\013\n\007Growing\020\002\022\n\n\006Sea"
"led\020\003\022\013\n\007Flushed\020\004\022\014\n\010Flushing\020\005\022\013\n\007Drop"
"ped\020\006\022\r\n\tImporting\020\007*>\n\017PlaceholderType\022"
"\010\n\004None\020\000\022\020\n\014BinaryVector\020d\022\017\n\013FloatVect"
"or\020e*\362\n\n\007MsgType\022\r\n\tUndefined\020\000\022\024\n\020Creat"
"eCollection\020d\022\022\n\016DropCollection\020e\022\021\n\rHas"
"Collection\020f\022\026\n\022DescribeCollection\020g\022\023\n\017"
"ShowCollections\020h\022\024\n\020GetSystemConfigs\020i\022"
"\022\n\016LoadCollection\020j\022\025\n\021ReleaseCollection"
"\020k\022\017\n\013CreateAlias\020l\022\r\n\tDropAlias\020m\022\016\n\nAl"
"terAlias\020n\022\024\n\017CreatePartition\020\310\001\022\022\n\rDrop"
"Partition\020\311\001\022\021\n\014HasPartition\020\312\001\022\026\n\021Descr"
"ibePartition\020\313\001\022\023\n\016ShowPartitions\020\314\001\022\023\n\016"
"LoadPartitions\020\315\001\022\026\n\021ReleasePartitions\020\316"
"\001\022\021\n\014ShowSegments\020\372\001\022\024\n\017DescribeSegment\020"
"\373\001\022\021\n\014LoadSegments\020\374\001\022\024\n\017ReleaseSegments"
"\020\375\001\022\024\n\017HandoffSegments\020\376\001\022\030\n\023LoadBalance"
"Segments\020\377\001\022\025\n\020DescribeSegments\020\200\002\022\020\n\013Cr"
"eateIndex\020\254\002\022\022\n\rDescribeIndex\020\255\002\022\016\n\tDrop"
"Index\020\256\002\022\013\n\006Insert\020\220\003\022\013\n\006Delete\020\221\003\022\n\n\005Fl"
"ush\020\222\003\022\027\n\022ResendSegmentStats\020\223\003\022\013\n\006Searc"
"h\020\364\003\022\021\n\014SearchResult\020\365\003\022\022\n\rGetIndexState"
"\020\366\003\022\032\n\025GetIndexBuildProgress\020\367\003\022\034\n\027GetCo"
"llectionStatistics\020\370\003\022\033\n\026GetPartitionSta"
"tistics\020\371\003\022\r\n\010Retrieve\020\372\003\022\023\n\016RetrieveRes"
"ult\020\373\003\022\024\n\017WatchDmChannels\020\374\003\022\025\n\020RemoveDm"
"Channels\020\375\003\022\027\n\022WatchQueryChannels\020\376\003\022\030\n\023"
"RemoveQueryChannels\020\377\003\022\035\n\030SealedSegments"
"ChangeInfo\020\200\004\022\027\n\022WatchDeltaChannels\020\201\004\022\024"
"\n\017GetShardLeaders\020\202\004\022\020\n\013GetReplicas\020\203\004\022\020"
"\n\013SegmentInfo\020\330\004\022\017\n\nSystemInfo\020\331\004\022\024\n\017Get"
"RecoveryInfo\020\332\004\022\024\n\017GetSegmentState\020\333\004\022\r\n"
"\010TimeTick\020\260\t\022\023\n\016QueryNodeStats\020\261\t\022\016\n\tLoa"
"dIndex\020\262\t\022\016\n\tRequestID\020\263\t\022\017\n\nRequestTSO\020"
"\264\t\022\024\n\017AllocateSegment\020\265\t\022\026\n\021SegmentStati"
"stics\020\266\t\022\025\n\020SegmentFlushDone\020\267\t\022\017\n\nDataN"
"odeTt\020\270\t\022\025\n\020CreateCredential\020\334\013\022\022\n\rGetCr"
"edential\020\335\013\022\025\n\020DeleteCredential\020\336\013\022\025\n\020Up"
"dateCredential\020\337\013\022\026\n\021ListCredUsernames\020\340"
"\013*\"\n\007DslType\022\007\n\003Dsl\020\000\022\016\n\nBoolExprV1\020\001*B\n"
"\017CompactionState\022\021\n\rUndefiedState\020\000\022\r\n\tE"
"xecuting\020\001\022\r\n\tCompleted\020\002*X\n\020Consistency"
"Level\022\n\n\006Strong\020\000\022\013\n\007Session\020\001\022\013\n\007Bounde"
"d\020\002\022\016\n\nEventually\020\003\022\016\n\nCustomized\020\004*\227\001\n\013"
"ImportState\022\021\n\rImportPending\020\000\022\020\n\014Import"
"Failed\020\001\022\021\n\rImportStarted\020\002\022\024\n\020ImportDow"
"nloaded\020\003\022\020\n\014ImportParsed\020\004\022\023\n\017ImportPer"
"sisted\020\005\022\023\n\017ImportCompleted\020\006BW\n\016io.milv"
"us.grpcB\013CommonProtoP\001Z3github.com/milvu"
"s-io/milvus/internal/proto/commonpb\240\001\001b\006"
"proto3"
"\016NotShardLeader\020\"\022\026\n\022NoReplicaAvailable\020"
"#\022\022\n\rDDRequestRace\020\350\007*X\n\nIndexState\022\022\n\016I"
"ndexStateNone\020\000\022\014\n\010Unissued\020\001\022\016\n\nInProgr"
"ess\020\002\022\014\n\010Finished\020\003\022\n\n\006Failed\020\004*\202\001\n\014Segm"
"entState\022\024\n\020SegmentStateNone\020\000\022\014\n\010NotExi"
"st\020\001\022\013\n\007Growing\020\002\022\n\n\006Sealed\020\003\022\013\n\007Flushed"
"\020\004\022\014\n\010Flushing\020\005\022\013\n\007Dropped\020\006\022\r\n\tImporti"
"ng\020\007*>\n\017PlaceholderType\022\010\n\004None\020\000\022\020\n\014Bin"
"aryVector\020d\022\017\n\013FloatVector\020e*\362\n\n\007MsgType"
"\022\r\n\tUndefined\020\000\022\024\n\020CreateCollection\020d\022\022\n"
"\016DropCollection\020e\022\021\n\rHasCollection\020f\022\026\n\022"
"DescribeCollection\020g\022\023\n\017ShowCollections\020"
"h\022\024\n\020GetSystemConfigs\020i\022\022\n\016LoadCollectio"
"n\020j\022\025\n\021ReleaseCollection\020k\022\017\n\013CreateAlia"
"s\020l\022\r\n\tDropAlias\020m\022\016\n\nAlterAlias\020n\022\024\n\017Cr"
"eatePartition\020\310\001\022\022\n\rDropPartition\020\311\001\022\021\n\014"
"HasPartition\020\312\001\022\026\n\021DescribePartition\020\313\001\022"
"\023\n\016ShowPartitions\020\314\001\022\023\n\016LoadPartitions\020\315"
"\001\022\026\n\021ReleasePartitions\020\316\001\022\021\n\014ShowSegment"
"s\020\372\001\022\024\n\017DescribeSegment\020\373\001\022\021\n\014LoadSegmen"
"ts\020\374\001\022\024\n\017ReleaseSegments\020\375\001\022\024\n\017HandoffSe"
"gments\020\376\001\022\030\n\023LoadBalanceSegments\020\377\001\022\025\n\020D"
"escribeSegments\020\200\002\022\020\n\013CreateIndex\020\254\002\022\022\n\r"
"DescribeIndex\020\255\002\022\016\n\tDropIndex\020\256\002\022\013\n\006Inse"
"rt\020\220\003\022\013\n\006Delete\020\221\003\022\n\n\005Flush\020\222\003\022\027\n\022Resend"
"SegmentStats\020\223\003\022\013\n\006Search\020\364\003\022\021\n\014SearchRe"
"sult\020\365\003\022\022\n\rGetIndexState\020\366\003\022\032\n\025GetIndexB"
"uildProgress\020\367\003\022\034\n\027GetCollectionStatisti"
"cs\020\370\003\022\033\n\026GetPartitionStatistics\020\371\003\022\r\n\010Re"
"trieve\020\372\003\022\023\n\016RetrieveResult\020\373\003\022\024\n\017WatchD"
"mChannels\020\374\003\022\025\n\020RemoveDmChannels\020\375\003\022\027\n\022W"
"atchQueryChannels\020\376\003\022\030\n\023RemoveQueryChann"
"els\020\377\003\022\035\n\030SealedSegmentsChangeInfo\020\200\004\022\027\n"
"\022WatchDeltaChannels\020\201\004\022\024\n\017GetShardLeader"
"s\020\202\004\022\020\n\013GetReplicas\020\203\004\022\020\n\013SegmentInfo\020\330\004"
"\022\017\n\nSystemInfo\020\331\004\022\024\n\017GetRecoveryInfo\020\332\004\022"
"\024\n\017GetSegmentState\020\333\004\022\r\n\010TimeTick\020\260\t\022\023\n\016"
"QueryNodeStats\020\261\t\022\016\n\tLoadIndex\020\262\t\022\016\n\tReq"
"uestID\020\263\t\022\017\n\nRequestTSO\020\264\t\022\024\n\017AllocateSe"
"gment\020\265\t\022\026\n\021SegmentStatistics\020\266\t\022\025\n\020Segm"
"entFlushDone\020\267\t\022\017\n\nDataNodeTt\020\270\t\022\025\n\020Crea"
"teCredential\020\334\013\022\022\n\rGetCredential\020\335\013\022\025\n\020D"
"eleteCredential\020\336\013\022\025\n\020UpdateCredential\020\337"
"\013\022\026\n\021ListCredUsernames\020\340\013*\"\n\007DslType\022\007\n\003"
"Dsl\020\000\022\016\n\nBoolExprV1\020\001*B\n\017CompactionState"
"\022\021\n\rUndefiedState\020\000\022\r\n\tExecuting\020\001\022\r\n\tCo"
"mpleted\020\002*X\n\020ConsistencyLevel\022\n\n\006Strong\020"
"\000\022\013\n\007Session\020\001\022\013\n\007Bounded\020\002\022\016\n\nEventuall"
"y\020\003\022\016\n\nCustomized\020\004*\227\001\n\013ImportState\022\021\n\rI"
"mportPending\020\000\022\020\n\014ImportFailed\020\001\022\021\n\rImpo"
"rtStarted\020\002\022\024\n\020ImportDownloaded\020\003\022\020\n\014Imp"
"ortParsed\020\004\022\023\n\017ImportPersisted\020\005\022\023\n\017Impo"
"rtCompleted\020\006BW\n\016io.milvus.grpcB\013CommonP"
"rotoP\001Z3github.com/milvus-io/milvus/inte"
"rnal/proto/commonpb\240\001\001b\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = {
};
......@@ -418,7 +418,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once;
static bool descriptor_table_common_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = {
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 3606,
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 3630,
&descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 10, 0,
schemas, file_default_instances, TableStruct_common_2eproto::offsets,
file_level_metadata_common_2eproto, 10, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto,
......@@ -469,6 +469,7 @@ bool ErrorCode_IsValid(int value) {
case 32:
case 33:
case 34:
case 35:
case 1000:
return true;
default:
......
......@@ -142,6 +142,7 @@ enum ErrorCode : int {
GetCredentialFailure = 32,
ListCredUsersFailure = 33,
NotShardLeader = 34,
NoReplicaAvailable = 35,
DDRequestRace = 1000,
ErrorCode_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
ErrorCode_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max()
......
......@@ -43,6 +43,7 @@ enum ErrorCode {
GetCredentialFailure = 32;
ListCredUsersFailure = 33;
NotShardLeader = 34;
NoReplicaAvailable = 35;
// internal error code.
DDRequestRace = 1000;
......
......@@ -22,6 +22,7 @@ import (
"fmt"
"strconv"
"sync"
"time"
"go.uber.org/zap"
......@@ -35,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
......@@ -607,12 +609,27 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, collectionNam
},
CollectionID: info.collID,
}
resp, err := qc.GetShardLeaders(ctx, req)
// retry until service available or context timeout
var resp *querypb.GetShardLeadersResponse
childCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err = retry.Do(childCtx, func() error {
resp, err = qc.GetShardLeaders(ctx, req)
if err != nil {
return retry.Unrecoverable(err)
}
if resp.Status.ErrorCode == commonpb.ErrorCode_Success {
return nil
}
// do not retry unless got NoReplicaAvailable from querycoord
if resp.Status.ErrorCode != commonpb.ErrorCode_NoReplicaAvailable {
return retry.Unrecoverable(fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.Status.Reason))
}
return fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.Status.Reason)
})
if err != nil {
return nil, err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
return nil, fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.Status.Reason)
return nil, fmt.Errorf("GetShardLeaders timeout, error: %s", err.Error())
}
shards := parseShardLeaderList2QueryNode(resp.GetShards())
......
......@@ -1205,7 +1205,7 @@ func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShard
if len(shardLeaderLists) == 0 {
return &querypb.GetShardLeadersResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
ErrorCode: commonpb.ErrorCode_NoReplicaAvailable,
Reason: "no replica available",
},
}, nil
......
......@@ -23,10 +23,11 @@ import (
"testing"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"go.uber.org/zap"
"github.com/stretchr/testify/assert"
......@@ -1695,7 +1696,7 @@ func TestGetShardLeaders(t *testing.T) {
queryCoord.cluster = mockCluster
resp, err = queryCoord.GetShardLeaders(ctx, getShardLeadersReq)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
assert.Equal(t, commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode)
// TODO(yah01): Disable the unit test case for now,
// restore it after the rebalance between replicas feature is implemented
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册