未验证 提交 95e762bb 编写于 作者: X Xiangyu Wang 提交者: GitHub

Add logs in retrieve request process (#5460)

Signed-off-by: NXiangyu Wang <xiangyu.wang@zilliz.com>
上级 cd549f81
......@@ -158,6 +158,8 @@ func (node *ProxyNode) Init() error {
Params.SearchChannelNames = []string{resp.RequestChannel}
Params.SearchResultChannelNames = []string{resp.ResultChannel}
Params.RetrieveChannelNames = []string{resp.RequestChannel}
Params.RetrieveResultChannelNames = []string{resp.ResultChannel}
}
// todo
......
......@@ -1205,18 +1205,30 @@ func (rt *RetrieveTask) PreExecute(ctx context.Context) error {
collectionName := rt.retrieve.CollectionName
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
if err != nil {
log.Debug("Failed to get collection id.", zap.Any("collectionName", collectionName),
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
return err
}
log.Info("Get collection id by name.", zap.Any("collectionName", collectionName),
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
if err := ValidateCollectionName(rt.retrieve.CollectionName); err != nil {
log.Debug("Invalid collection name.", zap.Any("collectionName", collectionName),
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
return err
}
log.Info("Validate collection name.", zap.Any("collectionName", collectionName),
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
for _, tag := range rt.retrieve.PartitionNames {
if err := ValidatePartitionTag(tag, false); err != nil {
log.Debug("Invalid partition name.", zap.Any("partitionName", tag),
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
return err
}
}
log.Info("Validate partition names.",
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
rt.Base.MsgType = commonpb.MsgType_Retrieve
rt.Ids = rt.retrieve.Ids
......@@ -1230,14 +1242,20 @@ func (rt *RetrieveTask) PreExecute(ctx context.Context) error {
partitionsMap, err := globalMetaCache.GetPartitions(ctx, collectionName)
if err != nil {
log.Debug("Failed to get partitions in collection.", zap.Any("collectionName", collectionName),
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
return err
}
log.Info("Get partitions in collection.", zap.Any("collectionName", collectionName),
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
partitionsRecord := make(map[UniqueID]bool)
for _, partitionName := range rt.retrieve.PartitionNames {
pattern := fmt.Sprintf("^%s$", partitionName)
re, err := regexp.Compile(pattern)
if err != nil {
log.Debug("Failed to compile partition name regex expression.", zap.Any("partitionName", partitionName),
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
return errors.New("invalid partition names")
}
found := false
......@@ -1251,11 +1269,14 @@ func (rt *RetrieveTask) PreExecute(ctx context.Context) error {
}
}
if !found {
// FIXME(wxyu): undefined behavior
errMsg := fmt.Sprintf("PartitonName: %s not found", partitionName)
return errors.New(errMsg)
}
}
log.Info("Retrieve PreExecute done.",
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
return nil
}
......@@ -1278,8 +1299,12 @@ func (rt *RetrieveTask) Execute(ctx context.Context) error {
err := rt.queryMsgStream.Produce(&msgPack)
log.Debug("proxynode", zap.Int("length of retrieveMsg", len(msgPack.Msgs)))
if err != nil {
log.Debug("proxynode", zap.String("send retrieve request failed", err.Error()))
log.Debug("Failed to send retrieve request.",
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
}
log.Info("Retrieve Execute done.",
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
return err
}
......@@ -1288,68 +1313,79 @@ func (rt *RetrieveTask) PostExecute(ctx context.Context) error {
defer func() {
log.Debug("WaitAndPostExecute", zap.Any("time cost", time.Since(t0)))
}()
for {
select {
case <-rt.TraceCtx().Done():
log.Debug("proxynode", zap.Int64("Retrieve: wait to finish failed, timeout!, taskID:", rt.ID()))
return fmt.Errorf("RetrieveTask:wait to finish failed, timeout : %d", rt.ID())
case retrieveResults := <-rt.resultBuf:
retrieveResult := make([]*internalpb.RetrieveResults, 0)
var reason string
for _, partialRetrieveResult := range retrieveResults {
if partialRetrieveResult.Status.ErrorCode == commonpb.ErrorCode_Success {
retrieveResult = append(retrieveResult, partialRetrieveResult)
} else {
reason += partialRetrieveResult.Status.Reason + "\n"
}
select {
case <-rt.TraceCtx().Done():
log.Debug("proxynode", zap.Int64("Retrieve: wait to finish failed, timeout!, taskID:", rt.ID()))
return fmt.Errorf("RetrieveTask:wait to finish failed, timeout : %d", rt.ID())
case retrieveResults := <-rt.resultBuf:
retrieveResult := make([]*internalpb.RetrieveResults, 0)
var reason string
for _, partialRetrieveResult := range retrieveResults {
if partialRetrieveResult.Status.ErrorCode == commonpb.ErrorCode_Success {
retrieveResult = append(retrieveResult, partialRetrieveResult)
} else {
reason += partialRetrieveResult.Status.Reason + "\n"
}
}
availableQueryNodeNum := len(retrieveResult)
if availableQueryNodeNum <= 0 {
rt.result = &milvuspb.RetrieveResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: reason,
},
}
return errors.New(reason)
if len(retrieveResult) == 0 {
rt.result = &milvuspb.RetrieveResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: reason,
},
}
log.Debug("Retrieve failed on all querynodes.",
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
return errors.New(reason)
}
availableQueryNodeNum = 0
for _, partialRetrieveResult := range retrieveResult {
if partialRetrieveResult.Ids == nil {
reason += "ids is nil\n"
availableQueryNodeNum := 0
for _, partialRetrieveResult := range retrieveResult {
if partialRetrieveResult.Ids == nil {
reason += "ids is nil\n"
continue
} else {
intIds, intOk := partialRetrieveResult.Ids.IdField.(*schemapb.IDs_IntId)
strIds, strOk := partialRetrieveResult.Ids.IdField.(*schemapb.IDs_StrId)
if !intOk && !strOk {
reason += "ids is empty\n"
continue
} else {
intIds, intOk := partialRetrieveResult.Ids.IdField.(*schemapb.IDs_IntId)
strIds, strOk := partialRetrieveResult.Ids.IdField.(*schemapb.IDs_StrId)
if !intOk && !strOk {
reason += "ids is empty\n"
continue
}
if !intOk {
rt.result.Ids.IdField.(*schemapb.IDs_IntId).IntId.Data = append(rt.result.Ids.IdField.(*schemapb.IDs_IntId).IntId.Data, intIds.IntId.Data...)
} else {
rt.result.Ids.IdField.(*schemapb.IDs_StrId).StrId.Data = append(rt.result.Ids.IdField.(*schemapb.IDs_StrId).StrId.Data, strIds.StrId.Data...)
}
}
if !intOk {
rt.result.Ids.IdField.(*schemapb.IDs_IntId).IntId.Data = append(rt.result.Ids.IdField.(*schemapb.IDs_IntId).IntId.Data, intIds.IntId.Data...)
} else {
rt.result.Ids.IdField.(*schemapb.IDs_StrId).StrId.Data = append(rt.result.Ids.IdField.(*schemapb.IDs_StrId).StrId.Data, strIds.StrId.Data...)
}
availableQueryNodeNum++
}
availableQueryNodeNum++
}
if availableQueryNodeNum <= 0 {
rt.result = &milvuspb.RetrieveResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: reason,
},
}
return nil
if availableQueryNodeNum == 0 {
log.Info("Not any valid result found.",
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
rt.result = &milvuspb.RetrieveResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: reason,
},
}
return nil
}
rt.result = &milvuspb.RetrieveResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: reason,
},
}
}
log.Info("Retrieve PostExecute done.",
zap.Any("requestID", rt.Base.MsgID), zap.Any("requestType", "retrieve"))
return nil
}
type HasCollectionTask struct {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册