diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 9f24deff94adf5da052d70987e8b9bc926d3faf0..d32b5a1bebb79af458e36c793538b2b11985d461 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1243,7 +1243,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { /*pRspWrapper->vgHandle = pVg;*/ /*pRspWrapper->topicHandle = pTopic;*/ taosWriteQitem(tmq->mqueue, pRspWrapper); - tsem_post(&tmq->rspSem); } goto CREATE_MSG_FAIL; @@ -1923,7 +1922,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { return NULL; } - if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { + while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { int32_t retryCnt = 0; while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { if (retryCnt++ > 40) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5d3350a69a463ade11e51bb731e2041978d600a8..cd67dc23ad15d30053ff5c46c8f2a94710ec1bc5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -169,18 +169,6 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { SMqDataRsp* pRsp = &pPushEntry->dataRsp; -#if 0 - A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); - A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); - - A(!pRsp->withSchema); - A(taosArrayGetSize(pRsp->blockSchema) == 0); - - if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { - A(pRsp->rspOffset.version > pRsp->reqOffset.version); - } -#endif - int32_t len = 0; int32_t code = 0; tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); @@ -224,22 +212,6 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { } int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) { -#if 0 - A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); - A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); - - A(!pRsp->withSchema); - A(taosArrayGetSize(pRsp->blockSchema) == 0); - - if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { - if (pRsp->blockNum > 0) { - A(pRsp->rspOffset.version > pRsp->reqOffset.version); - } else { - A(pRsp->rspOffset.version >= pRsp->reqOffset.version); - } - } -#endif - int32_t len = 0; int32_t code = 0; tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); @@ -282,25 +254,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con } int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) { -#if 0 - A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); - A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); - - if (pRsp->withSchema) { - A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); - } else { - A(taosArrayGetSize(pRsp->blockSchema) == 0); - } - - if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { - if (pRsp->blockNum > 0) { - A(pRsp->rspOffset.version > pRsp->reqOffset.version); - } else { - A(pRsp->rspOffset.version >= pRsp->reqOffset.version); - } - } -#endif - int32_t len = 0; int32_t code = 0; tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code); @@ -429,18 +382,6 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su } pRsp->withTbName = 0; -#if 0 - pRsp->withTbName = pReq->withTbName; - if (pRsp->withTbName) { - pRsp->blockTbName = taosArrayInit(0, sizeof(void*)); - if (pRsp->blockTbName == NULL) { - // TODO free - return -1; - } - } -#endif - - /*A(subType == TOPIC_SUB_TYPE__COLUMN);*/ pRsp->withSchema = false; return 0; @@ -992,7 +933,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->tbSink.pTSchema = tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, version); - ASSERT(pTask->tbSink.pTSchema); + if(pTask->tbSink.pTSchema == NULL) { + return -1; + } } streamSetupTrigger(pTask); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index f97c5ce93c7b46c74946d87186f405c2b93101e8..f7eba3fbc183579d149f62d7f5e3cac4a78e01b2 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -113,7 +113,10 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs return -1; } - ASSERT(!(pRsp->withTbName || pRsp->withSchema)); + if(pRsp->withTbName || pRsp->withSchema){ + tqError("get column should not with meta:%d,%d", pRsp->withTbName, pRsp->withSchema); + return -1; + } return 0; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 5a25d7e89468b29e6c597c555d27afb259f09b63..e80cb24216c394b52646b37ab3696fd50ee95c8a 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -265,7 +265,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) .msgLen = len, .ver = ver, }; - qStreamSetScanMemData(task, submit); + if(qStreamSetScanMemData(task, submit) != 0){ + continue; + } // here start to scan submit block to extract the subscribed data while (1) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ac78ddc23cbbf3fbdcaa3305e4acdebcebc47aa2..af625be0b19610472b3613a1934908d822c64661 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -992,19 +992,16 @@ const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) { SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); return &pTaskInfo->streamInfo.metaRsp; } int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); return pTaskInfo->streamInfo.prepareStatus.uid; } int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal)); return 0; } @@ -1040,20 +1037,12 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s return TSDB_CODE_SUCCESS; } -#if 0 -int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t scanVer) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); - ASSERT(pTaskInfo->streamInfo.pReq == NULL); - pTaskInfo->streamInfo.pReq = pReq; - pTaskInfo->streamInfo.scanVer = scanVer; - return 0; -} -#endif - int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT((pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE )&& (pTaskInfo->streamInfo.submit.msgStr == NULL)); + if((pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) || (pTaskInfo->streamInfo.submit.msgStr != NULL)){ + qError("qStreamSetScanMemData err:%d,%p", pTaskInfo->execModel, pTaskInfo->streamInfo.submit.msgStr); + return -1; + } qDebug("set the submit block for future scan"); pTaskInfo->streamInfo.submit = submit; @@ -1063,7 +1052,6 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); pTaskInfo->streamInfo.prepareStatus = *pOffset; pTaskInfo->streamInfo.returned = 0; @@ -1076,7 +1064,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // TODO add more check if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - ASSERT(pOperator->numOfDownstream == 1); + if(pOperator->numOfDownstream != 1){ + qError("pOperator->numOfDownstream != 1:%d", pOperator->numOfDownstream); + return -1; + } pOperator = pOperator->pDownstream[0]; } @@ -1087,6 +1078,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pTSInfo->base.dataReader = NULL; // let's seek to the next version in wal file if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) { + qError("tqSeekVer failed ver:%" PRId64, pOffset->version + 1); return -1; } } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { @@ -1101,6 +1093,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT uid = pTableInfo->uid; ts = INT64_MIN; } else { + qError("uid == 0 and tablelist size is 0"); return -1; } } @@ -1124,7 +1117,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } // TODO after dropping table, table may not found - ASSERT(found); + if(!found){ + qError("uid not found in tablelist %" PRId64, uid); + return -1; + } if (pTableScanInfo->base.dataReader == NULL) { STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); @@ -1133,7 +1129,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num, pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 || pTableScanInfo->base.dataReader == NULL) { - ASSERT(0); + qError("tsdbReaderOpen failed. uid:%" PRIi64, pOffset->uid); + return -1; } } @@ -1148,7 +1145,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts, pTableScanInfo->currentTable, numOfTables); } else { - ASSERT(0); + qError("invalid pOffset->type:%d", pOffset->type); + return -1; } } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { SStreamRawScanInfo* pInfo = pOperator->info; @@ -1180,7 +1178,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); - ASSERT(size == 1); tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL); @@ -1209,7 +1206,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; - assert(pMsg->info.ahandle != NULL); + if(pMsg->info.ahandle == NULL){ + qError("pMsg->info.ahandle is NULL"); + return; + } SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};