diff --git a/internal/querynode/retrieve_collection.go b/internal/querynode/retrieve_collection.go index 6a82174cfd94d1006d158bc85864c1d05a710e56..877ce606ec672db31dced4c768b5fe01b2f1090d 100644 --- a/internal/querynode/retrieve_collection.go +++ b/internal/querynode/retrieve_collection.go @@ -132,20 +132,26 @@ func (rc *retrieveCollection) receiveRetrieveMsg() { log.Debug("stop retrieveCollection's receiveRetrieveMsg", zap.Int64("collectionID", rc.collectionID)) return case rm := <-rc.msgBuffer: + log.Info("RetrieveCollection get retrieve message from msgBuffer", + zap.Int64("collectionID", rm.CollectionID), + zap.Int64("requestID", rm.ID()), + zap.Any("requestType", "retrieve"), + ) + sp, ctx := trace.StartSpanFromContext(rm.TraceCtx()) rm.SetTraceCtx(ctx) - log.Debug("get retrieve message from msgBuffer", - zap.Int64("msgID", rm.ID()), - zap.Int64("collectionID", rm.CollectionID)) serviceTime := rc.getServiceableTime() if rm.BeginTs() > serviceTime { bt, _ := tsoutil.ParseTS(rm.BeginTs()) st, _ := tsoutil.ParseTS(serviceTime) - log.Debug("querynode::receiveRetrieveMsg: add to unsolvedMsgs", + log.Debug("Timestamp of retrieve request great than serviceTime, add to unsolvedMsgs", zap.Any("sm.BeginTs", bt), zap.Any("serviceTime", st), zap.Any("delta seconds", (rm.BeginTs()-serviceTime)/(1000*1000*1000)), zap.Any("collectionID", rc.collectionID), + zap.Int64("collectionID", rm.CollectionID), + zap.Int64("requestID", rm.ID()), + zap.Any("requestType", "retrieve"), ) rc.addToUnsolvedMsg(rm) sp.LogFields( @@ -157,23 +163,42 @@ func (rc *retrieveCollection) receiveRetrieveMsg() { sp.Finish() continue } - log.Debug("doing retrieve in receiveRetrieveMsg...", - zap.Int64("msgID", rm.ID()), - zap.Int64("collectionID", rm.CollectionID)) + + log.Info("Doing retrieve in receiveRetrieveMsg...", + zap.Int64("collectionID", rm.CollectionID), + zap.Int64("requestID", rm.ID()), + zap.Any("requestType", "retrieve"), + ) err := rc.retrieve(rm) + if err != nil { - log.Error(err.Error()) - log.Debug("do retrieve failed in receiveRetrieveMsg, prepare to publish failed retrieve result", - zap.Int64("msgID", rm.ID()), - zap.Int64("collectionID", rm.CollectionID)) + log.Error(err.Error(), + zap.Int64("requestID", rm.ID()), + zap.Any("requestType", "retrieve"), + ) + + log.Debug("Failed to execute retrieve, prepare to publish failed retrieve result", + zap.Int64("collectionID", rm.CollectionID), + zap.Int64("requestID", rm.ID()), + zap.Any("requestType", "retrieve"), + ) + err2 := rc.publishFailedRetrieveResult(rm, err.Error()) if err2 != nil { - log.Error("publish FailedRetrieveResult failed", zap.Error(err2)) + log.Error("Failed to publish FailedRetrieveResult", + zap.Error(err2), + zap.Int64("requestID", rm.ID()), + zap.Any("requestType", "retrieve"), + ) } } - log.Debug("do retrieve done in retrieveRetrieveMsg", + + log.Debug("Do retrieve done in retrieveRetrieveMsg", zap.Int64("msgID", rm.ID()), - zap.Int64("collectionID", rm.CollectionID)) + zap.Int64("collectionID", rm.CollectionID), + zap.Int64("requestID", rm.ID()), + zap.Any("requestType", "retrieve"), + ) sp.Finish() } } @@ -188,12 +213,11 @@ func (rc *retrieveCollection) doUnsolvedMsgRetrieve() { default: serviceTime := rc.waitNewTSafe() rc.setServiceableTime(serviceTime) - log.Debug("querynode::doUnsolvedMsgRetrieve: setServiceableTime", + log.Debug("Update serviceTime", zap.Any("serviceTime", serviceTime), - ) - log.Debug("get tSafe from flow graph", + zap.Uint64("tSafe", serviceTime), zap.Int64("collectionID", rc.collectionID), - zap.Uint64("tSafe", serviceTime)) + ) retrieveMsg := make([]*msgstream.RetrieveMsg, 0) rc.unsolvedMsgMu.Lock() @@ -202,9 +226,11 @@ func (rc *retrieveCollection) doUnsolvedMsgRetrieve() { rc.unsolvedMsgMu.Unlock() for _, rm := range tmpMsg { - log.Debug("get retrieve message from unsolvedMsg", - zap.Int64("msgID", rm.ID()), - zap.Int64("collectionID", rm.CollectionID)) + log.Debug("Get retrieve message from unsolvedMsg", + zap.Int64("collectionID", rm.CollectionID), + zap.Int64("requestID", rm.ID()), + zap.Any("requestType", "retrieve"), + ) if rm.EndTs() <= serviceTime { retrieveMsg = append(retrieveMsg, rm) @@ -216,27 +242,46 @@ func (rc *retrieveCollection) doUnsolvedMsgRetrieve() { if len(retrieveMsg) <= 0 { continue } + for _, rm := range retrieveMsg { sp, ctx := trace.StartSpanFromContext(rm.TraceCtx()) rm.SetTraceCtx(ctx) - log.Debug("doing retrieve in doUnsolvedMsgRetrieve...", - zap.Int64("msgID", rm.ID()), - zap.Int64("collectionID", rm.CollectionID)) + + log.Debug("Doing retrieve in doUnsolvedMsgRetrieve...", + zap.Int64("collectionID", rm.CollectionID), + zap.Int64("requestID", rm.ID()), + zap.Any("requestType", "retrieve"), + ) err := rc.retrieve(rm) + if err != nil { - log.Error(err.Error()) - log.Debug("do retrieve failed in doUnsolvedMsgRetrieve, prepare to publish failed retrieve result", - zap.Int64("msgID", rm.ID()), - zap.Int64("collectionID", rm.CollectionID)) + log.Error(err.Error(), + zap.Int64("requestID", rm.ID()), + zap.Any("requestType", "retrieve"), + ) + + log.Debug("Failed to do retrieve in doUnsolvedMsgRetrieve, prepare to publish failed retrieve result", + zap.Int64("collectionID", rm.CollectionID), + zap.Int64("requestID", rm.ID()), + zap.Any("requestType", "retrieve"), + ) + err2 := rc.publishFailedRetrieveResult(rm, err.Error()) if err2 != nil { - log.Error("publish FailedRetrieveResult failed", zap.Error(err2)) + log.Error("Failed to publish FailedRetrieveResult", + zap.Error(err2), + zap.Int64("requestID", rm.ID()), + zap.Any("requestType", "retrieve"), + ) } } + sp.Finish() - log.Debug("do retrieve done in doUnsolvedMsgRetrieve", - zap.Int64("msgID", rm.ID()), - zap.Int64("collectionID", rm.CollectionID)) + log.Debug("Do retrieve done in doUnsolvedMsgRetrieve", + zap.Int64("collectionID", rm.CollectionID), + zap.Int64("requestID", rm.ID()), + zap.Any("requestType", "retrieve"), + ) } log.Debug("doUnsolvedMsgRetrieve, do retrieve done", zap.Int("num of retrieveMsg", len(retrieveMsg))) } diff --git a/internal/querynode/retrieve_service.go b/internal/querynode/retrieve_service.go index 0af7da93218cc1d126ad17ecbe90c377bb834f69..6a8ad10dfbb336779aaa8113a734f5a4efef4839 100644 --- a/internal/querynode/retrieve_service.go +++ b/internal/querynode/retrieve_service.go @@ -100,24 +100,44 @@ func (rs *retrieveService) consumeRetrieve() { continue } for _, msg := range msgPack.Msgs { - log.Debug("consume retrieve message", zap.Int64("msgID", msg.ID())) rm, ok := msg.(*msgstream.RetrieveMsg) if !ok { + // Not a retrieve request, discard continue } + log.Info("RetrieveService consume retrieve message", + zap.Int64("collectionID", rm.CollectionID), + zap.Int64("requestID", msg.ID()), + zap.Any("requestType", "retrieve"), + ) + sp, ctx := trace.StartSpanFromContext(rm.TraceCtx()) rm.SetTraceCtx(ctx) err := rs.collectionCheck(rm.CollectionID) if err != nil { + log.Debug("Failed to check collection exist, discard.", + zap.Int64("collectionID", rm.CollectionID), + zap.Int64("requestID", msg.ID()), + zap.Any("requestType", "retrieve"), + ) continue } + _, ok = rs.retrieveCollections[rm.CollectionID] if !ok { rs.startRetrieveCollection(rm.CollectionID) - log.Debug("new retrieve collection, start retrieve collection service", - zap.Int64("collectionID", rm.CollectionID)) + log.Debug("Receive retrieve request on new collection, start an new retrieve collection service", + zap.Int64("collectionID", rm.CollectionID), + zap.Int64("requestID", msg.ID()), + zap.Any("requestType", "retrieve"), + ) } + rs.retrieveCollections[rm.CollectionID].msgBuffer <- rm + log.Info("Put retrieve msg into msgBuffer", + zap.Any("requestID", msg.ID), + zap.Any("requestType", "retrieve"), + ) sp.Finish() } }