未验证 提交 e80f06c0 编写于 作者: X Xiangyu Wang 提交者: GitHub

Add retrieve process log (#5561)

Signed-off-by: NXiangyu Wang <xiangyu.wang@zilliz.com>
上级 786c335b
...@@ -132,20 +132,26 @@ func (rc *retrieveCollection) receiveRetrieveMsg() { ...@@ -132,20 +132,26 @@ func (rc *retrieveCollection) receiveRetrieveMsg() {
log.Debug("stop retrieveCollection's receiveRetrieveMsg", zap.Int64("collectionID", rc.collectionID)) log.Debug("stop retrieveCollection's receiveRetrieveMsg", zap.Int64("collectionID", rc.collectionID))
return return
case rm := <-rc.msgBuffer: case rm := <-rc.msgBuffer:
log.Info("RetrieveCollection get retrieve message from msgBuffer",
zap.Int64("collectionID", rm.CollectionID),
zap.Int64("requestID", rm.ID()),
zap.Any("requestType", "retrieve"),
)
sp, ctx := trace.StartSpanFromContext(rm.TraceCtx()) sp, ctx := trace.StartSpanFromContext(rm.TraceCtx())
rm.SetTraceCtx(ctx) rm.SetTraceCtx(ctx)
log.Debug("get retrieve message from msgBuffer",
zap.Int64("msgID", rm.ID()),
zap.Int64("collectionID", rm.CollectionID))
serviceTime := rc.getServiceableTime() serviceTime := rc.getServiceableTime()
if rm.BeginTs() > serviceTime { if rm.BeginTs() > serviceTime {
bt, _ := tsoutil.ParseTS(rm.BeginTs()) bt, _ := tsoutil.ParseTS(rm.BeginTs())
st, _ := tsoutil.ParseTS(serviceTime) st, _ := tsoutil.ParseTS(serviceTime)
log.Debug("querynode::receiveRetrieveMsg: add to unsolvedMsgs", log.Debug("Timestamp of retrieve request great than serviceTime, add to unsolvedMsgs",
zap.Any("sm.BeginTs", bt), zap.Any("sm.BeginTs", bt),
zap.Any("serviceTime", st), zap.Any("serviceTime", st),
zap.Any("delta seconds", (rm.BeginTs()-serviceTime)/(1000*1000*1000)), zap.Any("delta seconds", (rm.BeginTs()-serviceTime)/(1000*1000*1000)),
zap.Any("collectionID", rc.collectionID), zap.Any("collectionID", rc.collectionID),
zap.Int64("collectionID", rm.CollectionID),
zap.Int64("requestID", rm.ID()),
zap.Any("requestType", "retrieve"),
) )
rc.addToUnsolvedMsg(rm) rc.addToUnsolvedMsg(rm)
sp.LogFields( sp.LogFields(
...@@ -157,23 +163,42 @@ func (rc *retrieveCollection) receiveRetrieveMsg() { ...@@ -157,23 +163,42 @@ func (rc *retrieveCollection) receiveRetrieveMsg() {
sp.Finish() sp.Finish()
continue continue
} }
log.Debug("doing retrieve in receiveRetrieveMsg...",
zap.Int64("msgID", rm.ID()), log.Info("Doing retrieve in receiveRetrieveMsg...",
zap.Int64("collectionID", rm.CollectionID)) zap.Int64("collectionID", rm.CollectionID),
zap.Int64("requestID", rm.ID()),
zap.Any("requestType", "retrieve"),
)
err := rc.retrieve(rm) err := rc.retrieve(rm)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error(),
log.Debug("do retrieve failed in receiveRetrieveMsg, prepare to publish failed retrieve result", zap.Int64("requestID", rm.ID()),
zap.Int64("msgID", rm.ID()), zap.Any("requestType", "retrieve"),
zap.Int64("collectionID", rm.CollectionID)) )
log.Debug("Failed to execute retrieve, prepare to publish failed retrieve result",
zap.Int64("collectionID", rm.CollectionID),
zap.Int64("requestID", rm.ID()),
zap.Any("requestType", "retrieve"),
)
err2 := rc.publishFailedRetrieveResult(rm, err.Error()) err2 := rc.publishFailedRetrieveResult(rm, err.Error())
if err2 != nil { if err2 != nil {
log.Error("publish FailedRetrieveResult failed", zap.Error(err2)) log.Error("Failed to publish FailedRetrieveResult",
zap.Error(err2),
zap.Int64("requestID", rm.ID()),
zap.Any("requestType", "retrieve"),
)
} }
} }
log.Debug("do retrieve done in retrieveRetrieveMsg",
log.Debug("Do retrieve done in retrieveRetrieveMsg",
zap.Int64("msgID", rm.ID()), zap.Int64("msgID", rm.ID()),
zap.Int64("collectionID", rm.CollectionID)) zap.Int64("collectionID", rm.CollectionID),
zap.Int64("requestID", rm.ID()),
zap.Any("requestType", "retrieve"),
)
sp.Finish() sp.Finish()
} }
} }
...@@ -188,12 +213,11 @@ func (rc *retrieveCollection) doUnsolvedMsgRetrieve() { ...@@ -188,12 +213,11 @@ func (rc *retrieveCollection) doUnsolvedMsgRetrieve() {
default: default:
serviceTime := rc.waitNewTSafe() serviceTime := rc.waitNewTSafe()
rc.setServiceableTime(serviceTime) rc.setServiceableTime(serviceTime)
log.Debug("querynode::doUnsolvedMsgRetrieve: setServiceableTime", log.Debug("Update serviceTime",
zap.Any("serviceTime", serviceTime), zap.Any("serviceTime", serviceTime),
) zap.Uint64("tSafe", serviceTime),
log.Debug("get tSafe from flow graph",
zap.Int64("collectionID", rc.collectionID), zap.Int64("collectionID", rc.collectionID),
zap.Uint64("tSafe", serviceTime)) )
retrieveMsg := make([]*msgstream.RetrieveMsg, 0) retrieveMsg := make([]*msgstream.RetrieveMsg, 0)
rc.unsolvedMsgMu.Lock() rc.unsolvedMsgMu.Lock()
...@@ -202,9 +226,11 @@ func (rc *retrieveCollection) doUnsolvedMsgRetrieve() { ...@@ -202,9 +226,11 @@ func (rc *retrieveCollection) doUnsolvedMsgRetrieve() {
rc.unsolvedMsgMu.Unlock() rc.unsolvedMsgMu.Unlock()
for _, rm := range tmpMsg { for _, rm := range tmpMsg {
log.Debug("get retrieve message from unsolvedMsg", log.Debug("Get retrieve message from unsolvedMsg",
zap.Int64("msgID", rm.ID()), zap.Int64("collectionID", rm.CollectionID),
zap.Int64("collectionID", rm.CollectionID)) zap.Int64("requestID", rm.ID()),
zap.Any("requestType", "retrieve"),
)
if rm.EndTs() <= serviceTime { if rm.EndTs() <= serviceTime {
retrieveMsg = append(retrieveMsg, rm) retrieveMsg = append(retrieveMsg, rm)
...@@ -216,27 +242,46 @@ func (rc *retrieveCollection) doUnsolvedMsgRetrieve() { ...@@ -216,27 +242,46 @@ func (rc *retrieveCollection) doUnsolvedMsgRetrieve() {
if len(retrieveMsg) <= 0 { if len(retrieveMsg) <= 0 {
continue continue
} }
for _, rm := range retrieveMsg { for _, rm := range retrieveMsg {
sp, ctx := trace.StartSpanFromContext(rm.TraceCtx()) sp, ctx := trace.StartSpanFromContext(rm.TraceCtx())
rm.SetTraceCtx(ctx) rm.SetTraceCtx(ctx)
log.Debug("doing retrieve in doUnsolvedMsgRetrieve...",
zap.Int64("msgID", rm.ID()), log.Debug("Doing retrieve in doUnsolvedMsgRetrieve...",
zap.Int64("collectionID", rm.CollectionID)) zap.Int64("collectionID", rm.CollectionID),
zap.Int64("requestID", rm.ID()),
zap.Any("requestType", "retrieve"),
)
err := rc.retrieve(rm) err := rc.retrieve(rm)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error(),
log.Debug("do retrieve failed in doUnsolvedMsgRetrieve, prepare to publish failed retrieve result", zap.Int64("requestID", rm.ID()),
zap.Int64("msgID", rm.ID()), zap.Any("requestType", "retrieve"),
zap.Int64("collectionID", rm.CollectionID)) )
log.Debug("Failed to do retrieve in doUnsolvedMsgRetrieve, prepare to publish failed retrieve result",
zap.Int64("collectionID", rm.CollectionID),
zap.Int64("requestID", rm.ID()),
zap.Any("requestType", "retrieve"),
)
err2 := rc.publishFailedRetrieveResult(rm, err.Error()) err2 := rc.publishFailedRetrieveResult(rm, err.Error())
if err2 != nil { if err2 != nil {
log.Error("publish FailedRetrieveResult failed", zap.Error(err2)) log.Error("Failed to publish FailedRetrieveResult",
zap.Error(err2),
zap.Int64("requestID", rm.ID()),
zap.Any("requestType", "retrieve"),
)
} }
} }
sp.Finish() sp.Finish()
log.Debug("do retrieve done in doUnsolvedMsgRetrieve", log.Debug("Do retrieve done in doUnsolvedMsgRetrieve",
zap.Int64("msgID", rm.ID()), zap.Int64("collectionID", rm.CollectionID),
zap.Int64("collectionID", rm.CollectionID)) zap.Int64("requestID", rm.ID()),
zap.Any("requestType", "retrieve"),
)
} }
log.Debug("doUnsolvedMsgRetrieve, do retrieve done", zap.Int("num of retrieveMsg", len(retrieveMsg))) log.Debug("doUnsolvedMsgRetrieve, do retrieve done", zap.Int("num of retrieveMsg", len(retrieveMsg)))
} }
......
...@@ -100,24 +100,44 @@ func (rs *retrieveService) consumeRetrieve() { ...@@ -100,24 +100,44 @@ func (rs *retrieveService) consumeRetrieve() {
continue continue
} }
for _, msg := range msgPack.Msgs { for _, msg := range msgPack.Msgs {
log.Debug("consume retrieve message", zap.Int64("msgID", msg.ID()))
rm, ok := msg.(*msgstream.RetrieveMsg) rm, ok := msg.(*msgstream.RetrieveMsg)
if !ok { if !ok {
// Not a retrieve request, discard
continue continue
} }
log.Info("RetrieveService consume retrieve message",
zap.Int64("collectionID", rm.CollectionID),
zap.Int64("requestID", msg.ID()),
zap.Any("requestType", "retrieve"),
)
sp, ctx := trace.StartSpanFromContext(rm.TraceCtx()) sp, ctx := trace.StartSpanFromContext(rm.TraceCtx())
rm.SetTraceCtx(ctx) rm.SetTraceCtx(ctx)
err := rs.collectionCheck(rm.CollectionID) err := rs.collectionCheck(rm.CollectionID)
if err != nil { if err != nil {
log.Debug("Failed to check collection exist, discard.",
zap.Int64("collectionID", rm.CollectionID),
zap.Int64("requestID", msg.ID()),
zap.Any("requestType", "retrieve"),
)
continue continue
} }
_, ok = rs.retrieveCollections[rm.CollectionID] _, ok = rs.retrieveCollections[rm.CollectionID]
if !ok { if !ok {
rs.startRetrieveCollection(rm.CollectionID) rs.startRetrieveCollection(rm.CollectionID)
log.Debug("new retrieve collection, start retrieve collection service", log.Debug("Receive retrieve request on new collection, start an new retrieve collection service",
zap.Int64("collectionID", rm.CollectionID)) zap.Int64("collectionID", rm.CollectionID),
zap.Int64("requestID", msg.ID()),
zap.Any("requestType", "retrieve"),
)
} }
rs.retrieveCollections[rm.CollectionID].msgBuffer <- rm rs.retrieveCollections[rm.CollectionID].msgBuffer <- rm
log.Info("Put retrieve msg into msgBuffer",
zap.Any("requestID", msg.ID),
zap.Any("requestType", "retrieve"),
)
sp.Finish() sp.Finish()
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册