提交 31f57435 编写于 作者: L Liu Jicong

fix:log empty msg

上级 15ad147c
...@@ -58,7 +58,11 @@ static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -58,7 +58,11 @@ static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
dTrace("msg:%p, get from snode-stream queue", pMsg); dTrace("msg:%p, get from snode-stream queue", pMsg);
int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg); int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg);
if (code < 0) { if (code < 0) {
dGError("snd, msg:%p failed to process stream msg %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr(code)); if (pMsg) {
dGError("snd, msg:%p failed to process stream msg %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr(code));
} else {
dGError("snd, msg:%p failed to process stream empty msg since %s", pMsg, terrstr(code));
}
smSendRsp(pMsg, terrno); smSendRsp(pMsg, terrno);
} }
......
...@@ -86,8 +86,12 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -86,8 +86,12 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType), if (pMsg) {
terrstr(code)); dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
terrstr(code));
} else {
dGError("vgId:%d, msg:%p failed to process stream empty msg since %s", pVnode->vgId, pMsg, terrstr(code));
}
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
......
...@@ -157,7 +157,11 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -157,7 +157,11 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
_OVER: _OVER:
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr()); if (pMsg) {
dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
} else {
dGTrace("msg:%p, failed to process empty msg since %s", pMsg, terrstr());
}
if (IsReq(pRpc)) { if (IsReq(pRpc)) {
SRpcMsg rsp = {.code = code, .info = pRpc->info}; SRpcMsg rsp = {.code = code, .info = pRpc->info};
......
...@@ -231,7 +231,7 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -231,7 +231,7 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->upstreamTaskId; int32_t taskId = ntohl(pRsp->upstreamTaskId);
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId);
if (pTask) { if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
......
...@@ -139,7 +139,7 @@ if $data35 != 3 then ...@@ -139,7 +139,7 @@ if $data35 != 3 then
endi endi
sql insert into t1 values(1648791223001,12,14,13,11.1); sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 500 sleep 1000
sql select * from streamt; sql select * from streamt;
print count(*) , count(d) , sum(a) , max(b) , min(c) print count(*) , count(d) , sum(a) , max(b) , min(c)
...@@ -256,7 +256,7 @@ if $data35 != 3 then ...@@ -256,7 +256,7 @@ if $data35 != 3 then
endi endi
sql insert into t1 values(1648791223002,12,14,13,11.1); sql insert into t1 values(1648791223002,12,14,13,11.1);
sleep 100 sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1 # row 1
...@@ -286,7 +286,7 @@ if $data15 != 13 then ...@@ -286,7 +286,7 @@ if $data15 != 13 then
endi endi
sql insert into t1 values(1648791223003,12,14,13,11.1); sql insert into t1 values(1648791223003,12,14,13,11.1);
sleep 100 sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1 # row 1
...@@ -318,7 +318,7 @@ endi ...@@ -318,7 +318,7 @@ endi
sql insert into t1 values(1648791223001,1,1,1,1.1); sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1); sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1); sql insert into t1 values(1648791223003,3,3,3,3.1);
sleep 100 sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1 # row 1
...@@ -350,7 +350,7 @@ endi ...@@ -350,7 +350,7 @@ endi
sql insert into t1 values(1648791233003,3,2,3,2.1); sql insert into t1 values(1648791233003,3,2,3,2.1);
sql insert into t1 values(1648791233002,5,6,7,8.1); sql insert into t1 values(1648791233002,5,6,7,8.1);
sql insert into t1 values(1648791233002,3,2,3,2.1); sql insert into t1 values(1648791233002,3,2,3,2.1);
sleep 100 sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 2 # row 2
...@@ -380,7 +380,7 @@ if $data25 != 3 then ...@@ -380,7 +380,7 @@ if $data25 != 3 then
endi endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1); sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
sleep 100 sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 0 # row 0
...@@ -410,7 +410,7 @@ if $data05 != 3 then ...@@ -410,7 +410,7 @@ if $data05 != 3 then
endi endi
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1); sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
sleep 100 sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1 # row 1
...@@ -594,7 +594,7 @@ if $data35 != 3 then ...@@ -594,7 +594,7 @@ if $data35 != 3 then
endi endi
sql insert into t1 values(1648791223001,12,14,13,11.1); sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 500 sleep 1000
sql select * from streamt; sql select * from streamt;
print count(*) , count(d) , sum(a) , max(b) , min(c) print count(*) , count(d) , sum(a) , max(b) , min(c)
...@@ -711,7 +711,7 @@ if $data35 != 3 then ...@@ -711,7 +711,7 @@ if $data35 != 3 then
endi endi
sql insert into t1 values(1648791223002,12,14,13,11.1); sql insert into t1 values(1648791223002,12,14,13,11.1);
sleep 100 sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1 # row 1
...@@ -741,7 +741,7 @@ if $data15 != 13 then ...@@ -741,7 +741,7 @@ if $data15 != 13 then
endi endi
sql insert into t1 values(1648791223003,12,14,13,11.1); sql insert into t1 values(1648791223003,12,14,13,11.1);
sleep 100 sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1 # row 1
...@@ -773,7 +773,7 @@ endi ...@@ -773,7 +773,7 @@ endi
sql insert into t1 values(1648791223001,1,1,1,1.1); sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1); sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1); sql insert into t1 values(1648791223003,3,3,3,3.1);
sleep 100 sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1 # row 1
...@@ -805,7 +805,7 @@ endi ...@@ -805,7 +805,7 @@ endi
sql insert into t1 values(1648791233003,3,2,3,2.1); sql insert into t1 values(1648791233003,3,2,3,2.1);
sql insert into t1 values(1648791233002,5,6,7,8.1); sql insert into t1 values(1648791233002,5,6,7,8.1);
sql insert into t1 values(1648791233002,3,2,3,2.1); sql insert into t1 values(1648791233002,3,2,3,2.1);
sleep 100 sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 2 # row 2
...@@ -835,7 +835,7 @@ if $data25 != 3 then ...@@ -835,7 +835,7 @@ if $data25 != 3 then
endi endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1); sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
sleep 100 sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 0 # row 0
...@@ -865,7 +865,7 @@ if $data05 != 3 then ...@@ -865,7 +865,7 @@ if $data05 != 3 then
endi endi
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1); sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
sleep 100 sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1 # row 1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册