未验证 提交 783460b2 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18835 from taosdata/feature/stream

enh: tolerate exec error
...@@ -21,12 +21,12 @@ ...@@ -21,12 +21,12 @@
#include "os.h" #include "os.h"
#include "query.h" #include "query.h"
#include "scheduler.h" #include "scheduler.h"
#include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "version.h" #include "version.h"
#include "tdatablock.h"
#define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0 #define TSC_VAR_RELEASED 0
...@@ -178,6 +178,8 @@ void taos_free_result(TAOS_RES *res) { ...@@ -178,6 +178,8 @@ void taos_free_result(TAOS_RES *res) {
return; return;
} }
tscDebug("taos free res %p", res);
if (TD_RES_QUERY(res)) { if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res; SRequestObj *pRequest = (SRequestObj *)res;
tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId); tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId);
...@@ -796,7 +798,8 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c ...@@ -796,7 +798,8 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c
SQuery *pQuery = pRequest->pQuery; SQuery *pQuery = pRequest->pQuery;
pRequest->metric.ctgEnd = taosGetTimestampUs(); pRequest->metric.ctgEnd = taosGetTimestampUs();
qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, tstrerror(code)); qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId,
tstrerror(code));
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pWrapper->pCatalogReq->forceUpdate = false; pWrapper->pCatalogReq->forceUpdate = false;
......
...@@ -1215,6 +1215,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1215,6 +1215,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
tscDebug("consumer:%" PRId64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper);
taosWriteQitem(tmq->mqueue, pRspWrapper); taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
...@@ -1664,6 +1666,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1664,6 +1666,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} }
} }
tscDebug("consumer:%" PRId64 " handle rsp %p", tmq->consumerId, rspWrapper);
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
taosFreeQitem(rspWrapper); taosFreeQitem(rspWrapper);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
......
...@@ -54,6 +54,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -54,6 +54,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
/*ASSERT(false);*/ /*ASSERT(false);*/
qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId, qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId,
terrstr()); terrstr());
continue;
} }
if (output == NULL) { if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册