diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index f2dec7217f8adb3e1b01529687173be449c3275a..8f029e6061712ba1ede9f843aba114740d3633ce 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -21,12 +21,12 @@ #include "os.h" #include "query.h" #include "scheduler.h" +#include "tdatablock.h" #include "tglobal.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" #include "version.h" -#include "tdatablock.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 @@ -178,6 +178,8 @@ void taos_free_result(TAOS_RES *res) { return; } + tscDebug("taos free res %p", res); + if (TD_RES_QUERY(res)) { SRequestObj *pRequest = (SRequestObj *)res; tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId); @@ -796,7 +798,8 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c SQuery *pQuery = pRequest->pQuery; pRequest->metric.ctgEnd = taosGetTimestampUs(); - qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, tstrerror(code)); + qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, + tstrerror(code)); if (code == TSDB_CODE_SUCCESS) { pWrapper->pCatalogReq->forceUpdate = false; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 766a94caf1e974d9ea94b5c7a915cfc076769bcc..db717a4e4e0bbbc6f4f714255800bf54184c7108 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1215,6 +1215,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); + tscDebug("consumer:%" PRId64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper); + taosWriteQitem(tmq->mqueue, pRspWrapper); tsem_post(&tmq->rspSem); @@ -1664,6 +1666,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } } + tscDebug("consumer:%" PRId64 " handle rsp %p", tmq->consumerId, rspWrapper); + if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { taosFreeQitem(rspWrapper); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 6a83a9a4da6cdc95abe77502539343c46f094a60..d20c2902f5a5d7bca3d408399d5666c88d5b721b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -54,6 +54,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* /*ASSERT(false);*/ qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId, terrstr()); + continue; } if (output == NULL) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {